6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
10 #define MAX_POOL_NAME 64
11 #define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
12 #define RADOS_LOCK_NAME "RadosLock"
13 //#define RADOS_LOCK_COOKIE "Cookie"
14 #define RADOS_LOCK_COOKIE "foo"
16 void custom_peer_usage()
18 fprintf(stderr, "Custom peer options:\n"
19 "--pool: Rados pool to connect\n"
33 char pool[MAX_POOL_NAME + 1];
37 char obj_name[MAX_OBJ_NAME + 1];
38 enum rados_state state;
42 uint64_t watch_handle;
48 void rados_ack_cb(rados_completion_t c, void *arg)
50 struct peer_req *pr = (struct peer_req*) arg;
51 struct peerd *peer = pr->peer;
52 int ret = rados_aio_get_return_value(c);
55 dispatch(peer, pr, pr->req, dispatch_internal);
58 void rados_commit_cb(rados_completion_t c, void *arg)
60 struct peer_req *pr = (struct peer_req*) arg;
61 struct peerd *peer = pr->peer;
62 int ret = rados_aio_get_return_value(c);
65 dispatch(peer, pr, pr->req, dispatch_internal);
68 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
69 char *target, char *buf, uint64_t size, uint64_t offset)
71 struct radosd *rados = (struct radosd *) peer->priv;
72 struct rados_io *rio = (struct rados_io *) pr->priv;
75 rados_completion_t rados_compl;
78 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
81 r = rados_aio_read(rados->ioctx, target, rados_compl,
85 r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
88 r = rados_aio_write(rados->ioctx, target, rados_compl,
92 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
95 r = rados_aio_remove(rados->ioctx, target, rados_compl);
98 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
101 r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL);
108 rados_aio_release(rados_compl);
113 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
115 struct xseg_request *req = pr->req;
116 struct rados_io *rio = (struct rados_io *) pr->priv;
117 char *data = xseg_get_data(peer->xseg, pr->req);
119 return do_aio_generic(peer, pr, X_READ, rio->obj_name,
120 data + req->serviced,
121 req->size - req->serviced,
122 req->offset + req->serviced);
125 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
127 struct xseg_request *req = pr->req;
128 struct rados_io *rio = (struct rados_io *) pr->priv;
129 char *data = xseg_get_data(peer->xseg, pr->req);
131 return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
132 data + req->serviced,
133 req->size - req->serviced,
134 req->offset + req->serviced);
137 int handle_delete(struct peerd *peer, struct peer_req *pr)
140 //struct radosd *rados = (struct radosd *) peer->priv;
141 struct rados_io *rio = (struct rados_io *) pr->priv;
143 if (rio->state == ACCEPTED) {
144 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
145 rio->state = PENDING;
146 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
148 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
154 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
158 XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
165 int handle_info(struct peerd *peer, struct peer_req *pr)
168 struct xseg_request *req = pr->req;
169 //struct radosd *rados = (struct radosd *) peer->priv;
170 struct rados_io *rio = (struct rados_io *) pr->priv;
171 char *req_data = xseg_get_data(peer->xseg, req);
172 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
174 if (rio->state == ACCEPTED) {
175 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
176 rio->state = PENDING;
177 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
179 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
186 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
190 xinfo->size = rio->size;
191 pr->retval = sizeof(uint64_t);
192 XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);
199 int handle_read(struct peerd *peer, struct peer_req *pr)
201 struct rados_io *rio = (struct rados_io *) (pr->priv);
202 struct xseg_request *req = pr->req;
204 if (rio->state == ACCEPTED) {
209 rio->state = READING;
210 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
211 if (do_aio_read(peer, pr) < 0) {
212 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
217 else if (rio->state == READING) {
218 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
219 data = xseg_get_data(peer->xseg, pr->req);
221 req->serviced += pr->retval;
222 else if (pr->retval == 0) {
223 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
224 "%llu bytes. Zeroing out rest", rio->obj_name,
225 (unsigned long long) req->serviced);
226 /* reached end of object. zero out rest of data
227 * requested from this object
229 memset(data + req->serviced, 0, req->datalen - req->serviced);
230 req->serviced = req->datalen;
232 else if (pr->retval == -2) {
233 XSEGLOG2(&lc, I, "Reading of %s return -2. "
234 "Zeroing out data", rio->obj_name);
235 /* object not found. return zeros instead */
236 memset(data, 0, req->datalen);
237 req->serviced = req->datalen;
240 XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
241 /* pr->retval < 0 && pr->retval != -2 */
245 if (req->serviced >= req->datalen) {
246 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
252 /* should not happen */
257 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
258 if (do_aio_read(peer, pr) < 0) {
259 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
265 /* should not reach this */
266 printf("read request reached this\n");
272 int handle_write(struct peerd *peer, struct peer_req *pr)
274 struct rados_io *rio = (struct rados_io *) (pr->priv);
275 struct xseg_request *req = pr->req;
276 if (rio->state == ACCEPTED) {
279 if (req->flags & XF_FLUSH) {
288 //should we ensure req->op = X_READ ?
289 rio->state = WRITING;
290 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
291 if (do_aio_write(peer, pr) < 0) {
292 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
297 else if (rio->state == WRITING) {
298 /* rados writes return 0 if write succeeded or < 0 if failed
299 * no resubmission occurs
301 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
302 if (pr->retval == 0) {
303 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
304 req->serviced = req->datalen;
309 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
315 /* should not reach this */
316 printf("write request reached this\n");
322 int handle_copy(struct peerd *peer, struct peer_req *pr)
324 //struct radosd *rados = (struct radosd *) peer->priv;
325 struct xseg_request *req = pr->req;
326 struct rados_io *rio = (struct rados_io *) pr->priv;
328 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
330 if (rio->state == ACCEPTED){
331 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
332 rio->src_name, rio->obj_name);
334 complete(peer, pr); //or fail?
338 rio->src_name = malloc(MAX_OBJ_NAME + 1);
343 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
344 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
345 strncpy(rio->src_name, xcopy->target, end);
346 rio->src_name[end] = 0;
348 rio->buf = malloc(req->size);
354 rio->state = READING;
356 XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
357 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
358 req->size - rio->read, req->offset + rio->read) < 0) {
359 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
365 else if (rio->state == READING){
366 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
368 rio->read += pr->retval;
369 else if (pr->retval == 0) {
370 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
371 "%llu bytes. Zeroing out rest", rio->obj_name,
372 (unsigned long long) req->serviced);
373 memset(rio->buf + rio->read, 0, req->size - rio->read);
374 rio->read = req->size ;
377 XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
382 if (rio->read >= req->size) {
383 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
385 rio->state = WRITING;
386 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
387 if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
388 rio->buf, req->size, req->offset) < 0) {
389 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
396 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
397 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
398 req->size - rio->read, req->offset + rio->read) < 0) {
399 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
405 else if (rio->state == WRITING){
406 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
407 if (pr->retval == 0) {
408 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
409 XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
410 req->serviced = req->size;
415 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
416 XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
422 XSEGLOG2(&lc, E, "Unknown state");
433 rio->src_name = NULL;
443 int spawnthread(struct peerd *peer, struct peer_req *pr,
444 void *(*func)(void *arg))
446 //struct radosd *rados = (struct radosd *) peer->priv;
447 struct rados_io *rio = (struct rados_io *) (pr->priv);
450 pthread_attr_init(&attr);
451 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
453 return (pthread_create(&rio->tid, &attr, func, (void *) pr));
456 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
459 struct peer_req *pr = (struct peer_req *)arg;
460 //struct radosd *rados = (struct radosd *) pr->peer->priv;
461 struct rados_io *rio = (struct rados_io *) (pr->priv);
463 if (pr->req->op == X_OPEN){
464 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
465 pthread_cond_signal(&rio->cond);
468 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
471 void * lock_op(void *arg)
473 struct peer_req *pr = (struct peer_req *)arg;
474 struct radosd *rados = (struct radosd *) pr->peer->priv;
475 struct rados_io *rio = (struct rados_io *) (pr->priv);
477 XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
478 if (!(pr->req->flags & XF_NOSYNC)){
479 if (rados_watch(rados->ioctx, rio->obj_name, 0,
480 &rio->watch_handle, watch_cb, pr) < 0){
481 XSEGLOG2(&lc, E, "Rados watch failed for %s",
488 while(rados_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
489 C_LOCK_EXCLUSIVE, RADOS_LOCK_COOKIE, "", "", NULL, 0) < 0){
490 if (pr->req->flags & XF_NOSYNC){
491 XSEGLOG2(&lc, E, "Rados lock failed for %s",
497 XSEGLOG2(&lc, D, "rados lock for %s sleeping",
499 pthread_mutex_lock(&rio->m);
500 pthread_cond_wait(&rio->cond, &rio->m);
501 pthread_mutex_unlock(&rio->m);
502 XSEGLOG2(&lc, D, "rados lock for %s woke up",
506 if (!(pr->req->flags & XF_NOSYNC)){
507 if (rados_unwatch(rados->ioctx, rio->obj_name,
508 rio->watch_handle) < 0){
509 XSEGLOG2(&lc, E, "Rados unwatch failed");
512 XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
513 complete(pr->peer, pr);
517 void * unlock_op(void *arg)
519 struct peer_req *pr = (struct peer_req *)arg;
520 struct radosd *rados = (struct radosd *) pr->peer->priv;
521 struct rados_io *rio = (struct rados_io *) (pr->priv);
523 XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
524 if (pr->req->flags & XF_FORCE)
525 r = rados_break_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
528 r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
531 XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
535 if (rados_notify(rados->ioctx, rio->obj_name,
537 XSEGLOG2(&lc, E, "rados notify failed");
539 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
540 complete(pr->peer, pr);
545 int handle_open(struct peerd *peer, struct peer_req *pr)
547 int r = spawnthread(peer, pr, lock_op);
554 int handle_close(struct peerd *peer, struct peer_req *pr)
556 int r = spawnthread(peer, pr, unlock_op);
562 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
565 struct radosd *rados = malloc(sizeof(struct radosd));
566 struct rados_io *rio;
572 for (i = 0; i < argc; i++) {
573 if (!strcmp(argv[i], "--pool") && (i+1) < argc){
574 strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
575 rados->pool[MAX_POOL_NAME] = 0;
580 if (!rados->pool[0]){
581 XSEGLOG2(&lc, E , "Pool must be provided");
586 if (rados_create(&rados->cluster, NULL) < 0) {
587 XSEGLOG2(&lc, E, "Rados create failed!");
590 if (rados_conf_read_file(rados->cluster, NULL) < 0){
591 XSEGLOG2(&lc, E, "Error reading rados conf files!");
594 if (rados_connect(rados->cluster) < 0) {
595 XSEGLOG2(&lc, E, "Rados connect failed!");
596 rados_shutdown(rados->cluster);
600 if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
601 XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
602 if (rados_pool_create(rados->cluster, rados->pool) < 0){
603 XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
604 rados_shutdown(rados->cluster);
608 XSEGLOG2(&lc, I, "Pool created.");
610 if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
611 XSEGLOG2(&lc, E, "ioctx create problem.");
612 rados_shutdown(rados->cluster);
616 peer->priv = (void *) rados;
617 for (i = 0; i < peer->nr_ops; i++) {
618 rio = malloc(sizeof(struct rados_io));
621 //is this really necessary?
622 for (j = 0; j < i; j++) {
623 free(peer->peer_reqs[j].priv);
633 rio->watch_handle = 0;
634 pthread_cond_init(&rio->cond, NULL);
635 pthread_mutex_init(&rio->m, NULL);
636 peer->peer_reqs[i].priv = (void *) rio;
641 // nothing to do here for now
642 int custom_arg_parse(int argc, const char *argv[])
647 void custom_peer_finalize(struct peerd *peer)
652 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
653 enum dispatch_reason reason)
655 struct rados_io *rio = (struct rados_io *) (pr->priv);
656 char *target = xseg_get_target(peer->xseg, pr->req);
657 unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
658 strncpy(rio->obj_name, target, end);
659 rio->obj_name[end] = 0;
660 //log_pr("dispatch", pr);
661 if (reason == dispatch_accept)
662 rio->state = ACCEPTED;
664 switch (pr->req->op){
666 handle_read(peer, pr); break;
668 handle_write(peer, pr); break;
671 defer_request(peer, pr);
673 handle_delete(peer, pr);
677 defer_request(peer, pr);
679 handle_info(peer, pr);
683 defer_request(peer, pr);
685 handle_copy(peer, pr);
688 handle_open(peer, pr); break;
690 handle_close(peer, pr); break;