6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
10 #define MAX_POOL_NAME 64
11 #define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
23 char pool[MAX_POOL_NAME + 1];
27 char obj_name[MAX_OBJ_NAME + 1];
28 enum rados_state state;
32 uint64_t watch_handle;
38 void rados_ack_cb(rados_completion_t c, void *arg)
40 struct peer_req *pr = (struct peer_req*) arg;
41 struct peerd *peer = pr->peer;
42 int ret = rados_aio_get_return_value(c);
45 dispatch(peer, pr, pr->req, dispatch_internal);
48 void rados_commit_cb(rados_completion_t c, void *arg)
50 struct peer_req *pr = (struct peer_req*) arg;
51 struct peerd *peer = pr->peer;
52 int ret = rados_aio_get_return_value(c);
55 dispatch(peer, pr, pr->req, dispatch_internal);
58 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
59 char *target, char *buf, uint64_t size, uint64_t offset)
61 struct radosd *rados = (struct radosd *) peer->priv;
62 struct rados_io *rio = (struct rados_io *) pr->priv;
65 rados_completion_t rados_compl;
68 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
71 r = rados_aio_read(rados->ioctx, target, rados_compl,
75 r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
78 r = rados_aio_write(rados->ioctx, target, rados_compl,
82 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
85 r = rados_aio_remove(rados->ioctx, target, rados_compl);
88 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
91 r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL);
98 rados_aio_release(rados_compl);
103 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
105 struct xseg_request *req = pr->req;
106 struct rados_io *rio = (struct rados_io *) pr->priv;
107 char *data = xseg_get_data(peer->xseg, pr->req);
109 return do_aio_generic(peer, pr, X_READ, rio->obj_name,
110 data + req->serviced,
111 req->size - req->serviced,
112 req->offset + req->serviced);
115 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
117 struct xseg_request *req = pr->req;
118 struct rados_io *rio = (struct rados_io *) pr->priv;
119 char *data = xseg_get_data(peer->xseg, pr->req);
121 return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
122 data + req->serviced,
123 req->size - req->serviced,
124 req->offset + req->serviced);
127 int handle_delete(struct peerd *peer, struct peer_req *pr)
130 //struct radosd *rados = (struct radosd *) peer->priv;
131 struct rados_io *rio = (struct rados_io *) pr->priv;
133 if (rio->state == ACCEPTED) {
134 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
135 rio->state = PENDING;
136 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
138 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
144 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
148 XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
155 int handle_info(struct peerd *peer, struct peer_req *pr)
158 struct xseg_request *req = pr->req;
159 //struct radosd *rados = (struct radosd *) peer->priv;
160 struct rados_io *rio = (struct rados_io *) pr->priv;
161 char *req_data = xseg_get_data(peer->xseg, req);
162 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
164 if (rio->state == ACCEPTED) {
165 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
166 rio->state = PENDING;
167 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
169 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
176 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
180 xinfo->size = rio->size;
181 pr->retval = sizeof(uint64_t);
182 XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);
189 int handle_read(struct peerd *peer, struct peer_req *pr)
191 struct rados_io *rio = (struct rados_io *) (pr->priv);
192 struct xseg_request *req = pr->req;
194 if (rio->state == ACCEPTED) {
199 rio->state = READING;
200 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
201 if (do_aio_read(peer, pr) < 0) {
202 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
207 else if (rio->state == READING) {
208 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
209 data = xseg_get_data(peer->xseg, pr->req);
211 req->serviced += pr->retval;
212 else if (pr->retval == 0) {
213 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
214 "%llu bytes. Zeroing out rest", rio->obj_name,
215 (unsigned long long) req->serviced);
216 /* reached end of object. zero out rest of data
217 * requested from this object
219 memset(data + req->serviced, 0, req->datalen - req->serviced);
220 req->serviced = req->datalen;
222 else if (pr->retval == -2) {
223 XSEGLOG2(&lc, I, "Reading of %s return -2. "
224 "Zeroing out data", rio->obj_name);
225 /* object not found. return zeros instead */
226 memset(data, 0, req->datalen);
227 req->serviced = req->datalen;
230 XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
231 /* pr->retval < 0 && pr->retval != -2 */
235 if (req->serviced >= req->datalen) {
236 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
242 /* should not happen */
247 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
248 if (do_aio_read(peer, pr) < 0) {
249 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
255 /* should not reach this */
256 printf("read request reached this\n");
262 int handle_write(struct peerd *peer, struct peer_req *pr)
264 struct rados_io *rio = (struct rados_io *) (pr->priv);
265 struct xseg_request *req = pr->req;
266 if (rio->state == ACCEPTED) {
269 if (req->flags & XF_FLUSH) {
278 //should we ensure req->op = X_READ ?
279 rio->state = WRITING;
280 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
281 if (do_aio_write(peer, pr) < 0) {
282 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
287 else if (rio->state == WRITING) {
288 /* rados writes return 0 if write succeeded or < 0 if failed
289 * no resubmission occurs
291 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
292 if (pr->retval == 0) {
293 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
294 req->serviced = req->datalen;
299 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
305 /* should not reach this */
306 printf("write request reached this\n");
312 int handle_copy(struct peerd *peer, struct peer_req *pr)
314 //struct radosd *rados = (struct radosd *) peer->priv;
315 struct xseg_request *req = pr->req;
316 struct rados_io *rio = (struct rados_io *) pr->priv;
318 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
320 if (rio->state == ACCEPTED){
321 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
322 rio->src_name, rio->obj_name);
324 complete(peer, pr); //or fail?
328 rio->src_name = malloc(MAX_OBJ_NAME + 1);
333 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
334 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
335 strncpy(rio->src_name, xcopy->target, end);
336 rio->src_name[end] = 0;
338 rio->buf = malloc(req->size);
344 rio->state = READING;
346 XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
347 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
348 req->size - rio->read, req->offset + rio->read) < 0) {
349 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
355 else if (rio->state == READING){
356 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
358 rio->read += pr->retval;
359 else if (pr->retval == 0) {
360 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
361 "%llu bytes. Zeroing out rest", rio->obj_name,
362 (unsigned long long) req->serviced);
363 memset(rio->buf + rio->read, 0, req->size - rio->read);
364 rio->read = req->size ;
367 XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
372 if (rio->read >= req->size) {
373 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
375 rio->state = WRITING;
376 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
377 if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
378 rio->buf, req->size, req->offset) < 0) {
379 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
386 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
387 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
388 req->size - rio->read, req->offset + rio->read) < 0) {
389 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
395 else if (rio->state == WRITING){
396 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
397 if (pr->retval == 0) {
398 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
399 XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
400 req->serviced = req->size;
405 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
406 XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
412 XSEGLOG2(&lc, E, "Unknown state");
423 rio->src_name = NULL;
433 int spawnthread(struct peerd *peer, struct peer_req *pr,
434 void *(*func)(void *arg))
436 //struct radosd *rados = (struct radosd *) peer->priv;
437 struct rados_io *rio = (struct rados_io *) (pr->priv);
440 pthread_attr_init(&attr);
441 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
443 return (pthread_create(&rio->tid, &attr, func, (void *) pr));
446 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
449 struct peer_req *pr = (struct peer_req *)arg;
450 //struct radosd *rados = (struct radosd *) pr->peer->priv;
451 struct rados_io *rio = (struct rados_io *) (pr->priv);
453 if (pr->req->op == X_OPEN){
454 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
455 pthread_cond_signal(&rio->cond);
458 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
461 void * lock_op(void *arg)
463 struct peer_req *pr = (struct peer_req *)arg;
464 struct radosd *rados = (struct radosd *) pr->peer->priv;
465 struct rados_io *rio = (struct rados_io *) (pr->priv);
467 XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
468 if (!(pr->req->flags & XF_NOSYNC)){
469 if (rados_watch(rados->ioctx, rio->obj_name, 0,
470 &rio->watch_handle, watch_cb, pr) < 0){
471 XSEGLOG2(&lc, E, "Rados watch failed for %s",
478 while(rados_lock(rados->ioctx, rio->obj_name) < 0){
479 if (!(pr->req->flags & XF_NOSYNC)){
480 XSEGLOG2(&lc, E, "Rados lock failed for %s",
486 XSEGLOG2(&lc, D, "rados lock for %s sleeping",
488 pthread_mutex_lock(&rio->m);
489 pthread_cond_wait(&rio->cond, &rio->m);
490 pthread_mutex_unlock(&rio->m);
491 XSEGLOG2(&lc, D, "rados lock for %s woke up",
495 if (!(pr->req->flags & XF_NOSYNC)){
496 if (rados_unwatch(rados->ioctx, rio->obj_name,
497 rio->watch_handle) < 0){
498 XSEGLOG2(&lc, E, "Rados unwatch failed");
501 XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
502 complete(pr->peer, pr);
506 void * unlock_op(void *arg)
508 struct peer_req *pr = (struct peer_req *)arg;
509 struct radosd *rados = (struct radosd *) pr->peer->priv;
510 struct rados_io *rio = (struct rados_io *) (pr->priv);
512 XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
513 r = rados_unlock(rados->ioctx, rio->obj_name);
515 XSEGLOG2(&lc, E, "Rados unlock failed for %s", rio->obj_name);
519 if (rados_notify(rados->ioctx, rio->obj_name,
521 XSEGLOG2(&lc, E, "rados notify failed");
523 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
524 complete(pr->peer, pr);
529 int handle_open(struct peerd *peer, struct peer_req *pr)
531 int r = spawnthread(peer, pr, lock_op);
538 int handle_close(struct peerd *peer, struct peer_req *pr)
540 int r = spawnthread(peer, pr, unlock_op);
546 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
549 struct radosd *rados = malloc(sizeof(struct radosd));
550 struct rados_io *rio;
556 for (i = 0; i < argc; i++) {
557 if (!strcmp(argv[i], "--pool") && (i+1) < argc){
558 strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
559 rados->pool[MAX_POOL_NAME] = 0;
564 if (!rados->pool[0]){
565 XSEGLOG2(&lc, E , "Pool must be provided");
570 if (rados_create(&rados->cluster, NULL) < 0) {
571 XSEGLOG2(&lc, E, "Rados create failed!");
574 if (rados_conf_read_file(rados->cluster, NULL) < 0){
575 XSEGLOG2(&lc, E, "Error reading rados conf files!");
578 if (rados_connect(rados->cluster) < 0) {
579 XSEGLOG2(&lc, E, "Rados connect failed!");
580 rados_shutdown(rados->cluster);
584 if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
585 XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
586 if (rados_pool_create(rados->cluster, rados->pool) < 0){
587 XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
588 rados_shutdown(rados->cluster);
592 XSEGLOG2(&lc, I, "Pool created.");
594 if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
595 XSEGLOG2(&lc, E, "ioctx create problem.");
596 rados_shutdown(rados->cluster);
600 peer->priv = (void *) rados;
601 for (i = 0; i < peer->nr_ops; i++) {
602 rio = malloc(sizeof(struct rados_io));
605 //is this really necessary?
606 for (j = 0; j < i; j++) {
607 free(peer->peer_reqs[j].priv);
617 rio->watch_handle = 0;
618 pthread_cond_init(&rio->cond, NULL);
619 pthread_mutex_init(&rio->m, NULL);
620 peer->peer_reqs[i].priv = (void *) rio;
625 // nothing to do here for now
626 int custom_arg_parse(int argc, const char *argv[])
631 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
632 enum dispatch_reason reason)
634 struct rados_io *rio = (struct rados_io *) (pr->priv);
635 char *target = xseg_get_target(peer->xseg, pr->req);
636 unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
637 strncpy(rio->obj_name, target, end);
638 rio->obj_name[end] = 0;
639 //log_pr("dispatch", pr);
640 if (reason == dispatch_accept)
641 rio->state = ACCEPTED;
643 switch (pr->req->op){
645 handle_read(peer, pr); break;
647 handle_write(peer, pr); break;
650 defer_request(peer, pr);
652 handle_delete(peer, pr);
656 defer_request(peer, pr);
658 handle_info(peer, pr);
662 defer_request(peer, pr);
664 handle_copy(peer, pr);
667 handle_open(peer, pr); break;
669 handle_close(peer, pr); break;