#include <stdlib.h>
#include <unistd.h>
#include <xseg/xseg.h>
-#include <mpeer.h>
+#include <peer.h>
#include <rados/librados.h>
#include <xseg/protocol.h>
#define MAX_POOL_NAME 64
-#define MAX_OBJ_NAME 256
+#define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
enum rados_state {
ACCEPTED = 0,
- PENDING = 1
+ PENDING = 1,
+ READING = 2,
+ WRITING = 3
};
struct radosd {
rados_t cluster;
rados_ioctx_t ioctx;
- char pool[MAX_POOL_NAME];
+ char pool[MAX_POOL_NAME + 1];
};
struct rados_io{
- char obj_name[MAX_OBJ_NAME];
+ char obj_name[MAX_OBJ_NAME + 1];
enum rados_state state;
+ uint64_t size;
+ char *src_name, *buf;
+ uint64_t read;
+
};
void rados_ack_cb(rados_completion_t c, void *arg)
int r;
struct radosd *rados = (struct radosd *) peer->priv;
struct rados_io *rio = (struct rados_io *) pr->priv;
-
- //log_pr("delete start", pr);
+/*
+ XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
r = rados_remove(rados->ioctx, rio->obj_name);
if (r < 0) {
pr->retval = r;
+ XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
fail(peer, pr);
}
else {
pr->retval = 0;
+ XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
complete(peer, pr);
}
+
+ return 0;
+*/
+ if (rio->state == ACCEPTED) {
+ XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
+ rados_completion_t rados_compl;
+ r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+ if (r < 0){
+ XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+ fail(peer, pr);
+ return 0;
+ }
+ r = rados_aio_remove(rados->ioctx, rio->obj_name, rados_compl);
+ if (r < 0) {
+ rados_aio_release(rados_compl);
+ XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+ fail(peer, pr);
+ return 0;
+ }
+ rio->state = PENDING;
+ }
+ else {
+ if (pr->retval < 0){
+ XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
+ fail(peer, pr);
+ }
+ else {
+ XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
+ complete(peer, pr);
+ }
+ }
return 0;
}
int handle_info(struct peerd *peer, struct peer_req *pr)
{
- uint64_t size;
- time_t pmtime;
int r;
struct xseg_request *req = pr->req;
struct radosd *rados = (struct radosd *) peer->priv;
struct rados_io *rio = (struct rados_io *) pr->priv;
char *req_data = xseg_get_data(peer->xseg, req);
- struct xseg_reply_info *xinfo = req_data;
-
- log_pr("info start", pr);
-
+ struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
+/*
+ uint64_t size;
+ time_t pmtime;
+ XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
if (r < 0) {
pr->retval = r;
+ XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
fail(peer, pr);
}
else {
xinfo->size = size;
pr->retval = sizeof(uint64_t);
+ XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);
complete(peer,pr);
}
return 0;
+*/
+ if (rio->state == ACCEPTED) {
+ XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
+ rados_completion_t rados_compl;
+ r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
+ if (r < 0){
+ XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
+ fail(peer, pr);
+ return 0;
+ }
+ r = rados_aio_stat(rados->ioctx, rio->obj_name, rados_compl, &rio->size, NULL);
+ if (r < 0) {
+ rados_aio_release(rados_compl);
+ XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
+ fail(peer, pr);
+ return 0;
+ }
+ rio->state = PENDING;
+ }
+ else {
+ if (pr->retval < 0){
+ xinfo->size = 0;
+ XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
+ fail(peer, pr);
+ }
+ else {
+ xinfo->size = rio->size;
+ pr->retval = sizeof(uint64_t);
+ XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);
+ complete(peer, pr);
+ }
+ }
+ return 0;
}
-//FIXME req->state no longer apply
int handle_read(struct peerd *peer, struct peer_req *pr)
{
struct rados_io *rio = (struct rados_io *) (pr->priv);
}
//should we ensure req->op = X_READ ?
rio->state = PENDING;
- log_pr("read", pr);
+ XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
if (do_aio_read(peer, pr) < 0) {
+ XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
fail(peer, pr);
}
}
else if (rio->state == PENDING) {
+ XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
data = xseg_get_data(peer->xseg, pr->req);
if (pr->retval > 0)
req->serviced += pr->retval;
else if (pr->retval == 0) {
+ XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest",
+ rio->obj_name, (unsigned long long) req->serviced);
/* reached end of object. zero out rest of data
* requested from this object
*/
- memset(data, 0, req->datalen - req->serviced);
+ memset(data + req->serviced, 0, req->datalen - req->serviced);
req->serviced = req->datalen ;
}
else if (pr->retval == -2) {
+ XSEGLOG2(&lc, I, "Reading of %s return -2. Zeroing out data", rio->obj_name);
/* object not found. return zeros instead */
memset(data, 0, req->datalen);
req->serviced = req->datalen;
}
else {
+ XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
/* pr->retval < 0 && pr->retval != -2 */
fail(peer, pr);
return 0;
}
if (req->serviced >= req->datalen) {
- log_pr("read complete", pr);
+ XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
complete(peer, pr);
return 0;
}
//TODO assert req->op == X_READ
/* resubmit */
- log_pr("read resubmit", pr);
+ XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
if (do_aio_read(peer, pr) < 0) {
+ XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
fail(peer, pr);
}
}
}
//should we ensure req->op = X_READ ?
rio->state = PENDING;
- //log_pr("write", pr);
+ XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
if (do_aio_write(peer, pr) < 0) {
+ XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
fail(peer, pr);
}
}
/* rados writes return 0 if write succeeded or < 0 if failed
* no resubmission occurs
*/
- //log_pr("write complete", pr);
+ XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
if (pr->retval == 0) {
+ XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
req->serviced = req->datalen;
complete(peer, pr);
return 0;
}
else {
+ XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
fail(peer, pr);
return 0;
}
int handle_copy(struct peerd *peer, struct peer_req *pr)
{
+ /*
struct radosd *rados = (struct radosd *) peer->priv;
struct xseg_request *req = pr->req;
struct rados_io *rio = (struct rados_io *) pr->priv;
int r, sum;
- char *buf, src_name[MAX_OBJ_NAME];
+ char *buf, src_name[MAX_OBJ_NAME + 1];
struct xseg_request_copy *xcopy = xseg_get_data(peer->xseg, req);
- unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : xcopy->targetlen;
+ unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
strncpy(src_name, xcopy->target, end);
src_name[end] = 0;
fail(peer, pr);
return -1;
}
+ XSEGLOG2(&lc, I, "Copy of object %s to object %s started", src_name, rio->obj_name);
sum = 0;
do {
- r = rados_read(rados->ioctx, rio->obj_name, buf, req->size, 0);
- if (r < 0)
+ r = rados_read(rados->ioctx, src_name, buf, req->size, 0);
+ if (r < 0){
+ XSEGLOG2(&lc, E, "Read of object %s failed", src_name);
goto out_fail;
+ }
else if (r == 0) {
memset(buf+r, 0, req->size - r);
sum = req->size;
} else
sum += r;
} while (sum < req->size);
+ XSEGLOG2(&lc, D, "Read of object %s Completed", src_name);
r = rados_write_full(rados->ioctx, rio->obj_name, buf, req->size);
- if (r < 0)
+ if (r < 0){
+ XSEGLOG2(&lc, E, "Write of object %s failed", rio->obj_name);
goto out_fail;
+ }
+ free(buf);
req->serviced = req->size;
+ XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", src_name, rio->obj_name);
+ complete(peer, pr);
return 0;
out_fail:
free(buf);
pr->retval = -1;
+ XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", src_name, rio->obj_name);
fail(peer, pr);
return 0;
+ */
+
+
+ struct radosd *rados = (struct radosd *) peer->priv;
+ struct xseg_request *req = pr->req;
+ struct rados_io *rio = (struct rados_io *) pr->priv;
+ int r;
+ struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
+
+ if (rio->state == ACCEPTED){
+ XSEGLOG2(&lc, I, "Copy of object %s to object %s started", rio->src_name, rio->obj_name);
+ if (!req->size) {
+ complete(peer, pr); //or fail?
+ return 0;
+ }
+ rio->src_name = malloc(MAX_OBJ_NAME + 1);
+ if (!rio->src_name){
+ fail(peer, pr);
+ return -1;
+ }
+ unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
+ strncpy(rio->src_name, xcopy->target, end);
+ rio->src_name[end] = 0;
+ req->serviced = 0;
+ rio->buf = malloc(req->size);
+ if (!rio->buf) {
+ r = -1;
+ goto out_src;
+ }
+ XSEGLOG2(&lc, I, "Copy of object %s to object %s started", rio->src_name, rio->obj_name);
+ rio->state = READING;
+ rio->read = 0;
+ XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
+ if (do_aio_read(peer, pr, rio->src_name, rio->buf) < 0) {
+ XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
+ fail(peer, pr);
+ r = -1;
+ goto out_buf;
+ }
+ }
+ else if (rio->state == READING){
+ XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
+ if (pr->retval > 0)
+ rio->read += pr->retval;
+ else if (pr->retval == 0) {
+ XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest",
+ rio->obj_name, (unsigned long long) req->serviced);
+ memset(rio->buf + rio->read, 0, req->size - rio->read);
+ rio->read = req->size ;
+ }
+ else {
+ XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
+ r = -1;
+ goto out_buf;
+ }
+ if (rio->read >= req->size) {
+ XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
+ //do_aio_write
+ rio->state = WRITING;
+ XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
+ if (do_aio_write(peer, pr, rio->obj_name, rio->buf) < 0) {
+ XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
+ r = -1;
+ goto out_buf;
+ }
+ return 0;
+ }
+
+ if (!req->size) {
+ r = -1;
+ goto out_buf;
+ }
+ XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
+ if (do_aio_read(peer, pr, rio->src_name, rio->buf + rio->read, req->size - rio->read) < 0) {
+ XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
+ r = -1;
+ goto out_buf;
+ }
+ }
+ else if (rio->state == WRITING){
+ XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
+ if (pr->retval == 0) {
+ XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
+ XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
+ req->serviced = req->size;
+ r = 0;
+ goto out_buf;
+ }
+ else {
+ XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
+ XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
+ r = -1;
+ goto out_buf;
+ }
+ }
+ else {
+ XSEGLOG2(&lc, E, "Unknown state");
+ }
+ return 0;
+
+
+out_buf:
+ free(rio->buf);
+out_src:
+ free(rio->src_name);
+
+ rio->buf = NULL;
+ rio->src_name = NULL;
+ rio->read = 0;
+
+ if (r < 0)
+ fail(peer ,pr);
+ else
+ complete(peer, pr);
+ return 0;
}
+int handle_open(struct peerd *peer, struct peer_req *pr)
+{
+ struct radosd *rados = (struct radosd *) peer->priv;
+ struct rados_io *rio = (struct rados_io *) (pr->priv);
+ int r = rados_lock(rados->ioctx, rio->obj_name);
+ if (r < 0){
+ fail(peer, pr);
+ }
+ else {
+ complete(peer, pr);
+ }
+ return 0;
+}
+
+
+int handle_close(struct peerd *peer, struct peer_req *pr)
+{
+ struct radosd *rados = (struct radosd *) peer->priv;
+ struct rados_io *rio = (struct rados_io *) (pr->priv);
+ int r = rados_unlock(rados->ioctx, rio->obj_name);
+ if (r < 0){
+ fail(peer, pr);
+ }
+ else {
+ complete(peer, pr);
+ }
+ return 0;
+}
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
{
perror("malloc");
return -1;
}
- //TODO this should be a parameter. maybe -r (from rados)?
- strncpy(rados->pool, "xseg", MAX_POOL_NAME);
+ rados->pool[0] = 0;
+ for (i = 0; i < argc; i++) {
+ if (!strcmp(argv[i], "--pool") && (i+1) < argc){
+ strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
+ rados->pool[MAX_POOL_NAME] = 0;
+ i += 1;
+ continue;
+ }
+ }
+ if (!rados->pool[0]){
+ free(rados);
+ return -1;
+ }
+
if (rados_create(&rados->cluster, NULL) < 0) {
printf("Rados create failed!\n");
return -1;
{
struct rados_io *rio = (struct rados_io *) (pr->priv);
char *target = xseg_get_target(peer->xseg, pr->req);
- unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : pr->req->targetlen;
+ unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
strncpy(rio->obj_name, target, end);
rio->obj_name[end] = 0;
//log_pr("dispatch", pr);
else
handle_copy(peer, pr);
break;
+ case X_OPEN:
+ handle_open(peer, pr); break;
+ case X_CLOSE:
+ handle_close(peer, pr); break;
+
default:
fail(peer, pr);
}