6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
9 #define MAX_POOL_NAME 64
10 #define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
22 char pool[MAX_POOL_NAME + 1];
26 char obj_name[MAX_OBJ_NAME + 1];
27 enum rados_state state;
34 void rados_ack_cb(rados_completion_t c, void *arg)
36 struct peer_req *pr = (struct peer_req*) arg;
37 struct peerd *peer = pr->peer;
38 int ret = rados_aio_get_return_value(c);
41 dispatch(peer, pr, pr->req, dispatch_internal);
44 void rados_commit_cb(rados_completion_t c, void *arg)
46 struct peer_req *pr = (struct peer_req*) arg;
47 struct peerd *peer = pr->peer;
48 int ret = rados_aio_get_return_value(c);
51 dispatch(peer, pr, pr->req, dispatch_internal);
54 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
55 char *target, char *buf, uint64_t size, uint64_t offset)
57 struct radosd *rados = (struct radosd *) peer->priv;
58 struct rados_io *rio = (struct rados_io *) pr->priv;
61 rados_completion_t rados_compl;
64 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
67 r = rados_aio_read(rados->ioctx, target, rados_compl,
71 r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
74 r = rados_aio_write(rados->ioctx, target, rados_compl,
78 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
81 r = rados_aio_remove(rados->ioctx, target, rados_compl);
84 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
87 r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL);
94 rados_aio_release(rados_compl);
99 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
101 struct xseg_request *req = pr->req;
102 struct rados_io *rio = (struct rados_io *) pr->priv;
103 char *data = xseg_get_data(peer->xseg, pr->req);
105 return do_aio_generic(peer, pr, X_READ, rio->obj_name,
106 data + req->serviced,
107 req->size - req->serviced,
108 req->offset + req->serviced);
111 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
113 struct xseg_request *req = pr->req;
114 struct rados_io *rio = (struct rados_io *) pr->priv;
115 char *data = xseg_get_data(peer->xseg, pr->req);
118 return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
119 data + req->serviced,
120 req->size - req->serviced,
121 req->offset + req->serviced);
124 int handle_delete(struct peerd *peer, struct peer_req *pr)
127 struct radosd *rados = (struct radosd *) peer->priv;
128 struct rados_io *rio = (struct rados_io *) pr->priv;
130 if (rio->state == ACCEPTED) {
131 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
132 rio->state = PENDING;
133 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
135 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
141 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
145 XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
152 int handle_info(struct peerd *peer, struct peer_req *pr)
155 struct xseg_request *req = pr->req;
156 struct radosd *rados = (struct radosd *) peer->priv;
157 struct rados_io *rio = (struct rados_io *) pr->priv;
158 char *req_data = xseg_get_data(peer->xseg, req);
159 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
161 if (rio->state == ACCEPTED) {
162 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
163 rio->state = PENDING;
164 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
166 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
173 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
177 xinfo->size = rio->size;
178 pr->retval = sizeof(uint64_t);
179 XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);
186 int handle_read(struct peerd *peer, struct peer_req *pr)
188 struct rados_io *rio = (struct rados_io *) (pr->priv);
189 struct xseg_request *req = pr->req;
191 if (rio->state == ACCEPTED) {
196 rio->state = READING;
197 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
198 if (do_aio_read(peer, pr) < 0) {
199 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
204 else if (rio->state == READING) {
205 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
206 data = xseg_get_data(peer->xseg, pr->req);
208 req->serviced += pr->retval;
209 else if (pr->retval == 0) {
210 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
211 "%llu bytes. Zeroing out rest", rio->obj_name,
212 (unsigned long long) req->serviced);
213 /* reached end of object. zero out rest of data
214 * requested from this object
216 memset(data + req->serviced, 0, req->datalen - req->serviced);
217 req->serviced = req->datalen;
219 else if (pr->retval == -2) {
220 XSEGLOG2(&lc, I, "Reading of %s return -2. "
221 "Zeroing out data", rio->obj_name);
222 /* object not found. return zeros instead */
223 memset(data, 0, req->datalen);
224 req->serviced = req->datalen;
227 XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
228 /* pr->retval < 0 && pr->retval != -2 */
232 if (req->serviced >= req->datalen) {
233 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
239 /* should not happen */
244 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
245 if (do_aio_read(peer, pr) < 0) {
246 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
252 /* should not reach this */
253 printf("read request reached this\n");
259 int handle_write(struct peerd *peer, struct peer_req *pr)
261 struct rados_io *rio = (struct rados_io *) (pr->priv);
262 struct xseg_request *req = pr->req;
263 if (rio->state == ACCEPTED) {
266 if (req->flags & XF_FLUSH) {
275 //should we ensure req->op = X_READ ?
276 rio->state = WRITING;
277 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
278 if (do_aio_write(peer, pr) < 0) {
279 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
284 else if (rio->state == WRITING) {
285 /* rados writes return 0 if write succeeded or < 0 if failed
286 * no resubmission occurs
288 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
289 if (pr->retval == 0) {
290 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
291 req->serviced = req->datalen;
296 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
302 /* should not reach this */
303 printf("write request reached this\n");
309 int handle_copy(struct peerd *peer, struct peer_req *pr)
311 struct radosd *rados = (struct radosd *) peer->priv;
312 struct xseg_request *req = pr->req;
313 struct rados_io *rio = (struct rados_io *) pr->priv;
315 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
317 if (rio->state == ACCEPTED){
318 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
319 rio->src_name, rio->obj_name);
321 complete(peer, pr); //or fail?
325 rio->src_name = malloc(MAX_OBJ_NAME + 1);
330 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
331 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
332 strncpy(rio->src_name, xcopy->target, end);
333 rio->src_name[end] = 0;
335 rio->buf = malloc(req->size);
341 rio->state = READING;
343 XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
344 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
345 req->size - rio->read, req->offset + rio->read) < 0) {
346 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
352 else if (rio->state == READING){
353 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
355 rio->read += pr->retval;
356 else if (pr->retval == 0) {
357 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
358 "%llu bytes. Zeroing out rest", rio->obj_name,
359 (unsigned long long) req->serviced);
360 memset(rio->buf + rio->read, 0, req->size - rio->read);
361 rio->read = req->size ;
364 XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
369 if (rio->read >= req->size) {
370 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
372 rio->state = WRITING;
373 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
374 if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
375 rio->buf, req->size, req->offset) < 0) {
376 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
383 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
384 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
385 req->size - rio->read, req->offset + rio->read) < 0) {
386 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
392 else if (rio->state == WRITING){
393 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
394 if (pr->retval == 0) {
395 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
396 XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
397 req->serviced = req->size;
402 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
403 XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
409 XSEGLOG2(&lc, E, "Unknown state");
420 rio->src_name = NULL;
430 int handle_open(struct peerd *peer, struct peer_req *pr)
432 struct radosd *rados = (struct radosd *) peer->priv;
433 struct rados_io *rio = (struct rados_io *) (pr->priv);
434 int r = rados_lock(rados->ioctx, rio->obj_name);
445 int handle_close(struct peerd *peer, struct peer_req *pr)
447 struct radosd *rados = (struct radosd *) peer->priv;
448 struct rados_io *rio = (struct rados_io *) (pr->priv);
449 int r = rados_unlock(rados->ioctx, rio->obj_name);
459 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
462 struct radosd *rados = malloc(sizeof(struct radosd));
463 struct rados_io *rio;
469 for (i = 0; i < argc; i++) {
470 if (!strcmp(argv[i], "--pool") && (i+1) < argc){
471 strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
472 rados->pool[MAX_POOL_NAME] = 0;
477 if (!rados->pool[0]){
478 XSEGLOG2(&lc, E , "Pool must be provided");
483 if (rados_create(&rados->cluster, NULL) < 0) {
484 XSEGLOG2(&lc, E, "Rados create failed!");
487 if (rados_conf_read_file(rados->cluster, NULL) < 0){
488 XSEGLOG2(&lc, E, "Error reading rados conf files!");
491 if (rados_connect(rados->cluster) < 0) {
492 XSEGLOG2(&lc, E, "Rados connect failed!");
493 rados_shutdown(rados->cluster);
497 if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
498 XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
499 if (rados_pool_create(rados->cluster, rados->pool) < 0){
500 XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
501 rados_shutdown(rados->cluster);
505 XSEGLOG2(&lc, I, "Pool created.");
507 if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
508 XSEGLOG2(&lc, E, "ioctx create problem.");
509 rados_shutdown(rados->cluster);
513 peer->priv = (void *) rados;
514 for (i = 0; i < peer->nr_ops; i++) {
515 rio = malloc(sizeof(struct rados_io));
518 //is this really necessary?
519 for (j = 0; j < i; j++) {
520 free(peer->peer_reqs[j].priv);
526 peer->peer_reqs[i].priv = (void *) rio;
531 // nothing to do here for now
532 int custom_arg_parse(int argc, const char *argv[])
537 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
538 enum dispatch_reason reason)
540 struct rados_io *rio = (struct rados_io *) (pr->priv);
541 char *target = xseg_get_target(peer->xseg, pr->req);
542 unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
543 strncpy(rio->obj_name, target, end);
544 rio->obj_name[end] = 0;
545 //log_pr("dispatch", pr);
546 if (reason == dispatch_accept)
547 rio->state = ACCEPTED;
549 switch (pr->req->op){
551 handle_read(peer, pr); break;
553 handle_write(peer, pr); break;
556 defer_request(peer, pr);
558 handle_delete(peer, pr);
562 defer_request(peer, pr);
564 handle_info(peer, pr);
568 defer_request(peer, pr);
570 handle_copy(peer, pr);
573 handle_open(peer, pr); break;
575 handle_close(peer, pr); break;