2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <xseg/xseg.h>
40 #include <rados/librados.h>
41 #include <xseg/protocol.h>
43 #include <openssl/sha.h>
45 #ifndef SHA256_DIGEST_SIZE
46 #define SHA256_DIGEST_SIZE 32
48 /* hex representation of sha256 value takes up double the sha256 size */
49 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
52 #define LOCK_SUFFIX "_lock"
53 #define LOCK_SUFFIX_LEN 5
55 #define MAX_POOL_NAME 64
56 #define MAX_OBJ_NAME (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + 1)
57 #define RADOS_LOCK_NAME "RadosLock"
58 //#define RADOS_LOCK_COOKIE "Cookie"
59 #define RADOS_LOCK_COOKIE "foo"
61 void custom_peer_usage()
63 fprintf(stderr, "Custom peer options:\n"
64 "--pool: Rados pool to connect\n"
69 * Unsafe. Doesn't check if data length is odd!
71 static void hexlify(unsigned char *data, char *hex)
74 for (i=0; i<SHA256_DIGEST_LENGTH; i++)
75 sprintf(hex+2*i, "%02x", data[i]);
78 static void unhexlify(char *hex, unsigned char *data)
82 for (i=0; i<SHA256_DIGEST_LENGTH; i++){
97 data[i] |= (c << 4) & 0xF0;
127 char pool[MAX_POOL_NAME + 1];
131 char obj_name[MAX_OBJ_NAME + 1];
132 enum rados_state state;
134 char *second_name, *buf;
136 uint64_t watch_handle;
142 void rados_ack_cb(rados_completion_t c, void *arg)
144 struct peer_req *pr = (struct peer_req*) arg;
145 struct peerd *peer = pr->peer;
146 int ret = rados_aio_get_return_value(c);
148 rados_aio_release(c);
149 dispatch(peer, pr, pr->req, dispatch_internal);
152 void rados_commit_cb(rados_completion_t c, void *arg)
154 struct peer_req *pr = (struct peer_req*) arg;
155 struct peerd *peer = pr->peer;
156 int ret = rados_aio_get_return_value(c);
158 rados_aio_release(c);
159 dispatch(peer, pr, pr->req, dispatch_internal);
162 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
163 char *target, char *buf, uint64_t size, uint64_t offset)
165 struct radosd *rados = (struct radosd *) peer->priv;
166 struct rados_io *rio = (struct rados_io *) pr->priv;
169 rados_completion_t rados_compl;
172 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
175 r = rados_aio_read(rados->ioctx, target, rados_compl,
179 r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
182 r = rados_aio_write(rados->ioctx, target, rados_compl,
186 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
189 r = rados_aio_remove(rados->ioctx, target, rados_compl);
192 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
195 r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL);
202 rados_aio_release(rados_compl);
207 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
209 struct xseg_request *req = pr->req;
210 struct rados_io *rio = (struct rados_io *) pr->priv;
211 char *data = xseg_get_data(peer->xseg, pr->req);
213 return do_aio_generic(peer, pr, X_READ, rio->obj_name,
214 data + req->serviced,
215 req->size - req->serviced,
216 req->offset + req->serviced);
219 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
221 struct xseg_request *req = pr->req;
222 struct rados_io *rio = (struct rados_io *) pr->priv;
223 char *data = xseg_get_data(peer->xseg, pr->req);
225 return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
226 data + req->serviced,
227 req->size - req->serviced,
228 req->offset + req->serviced);
231 int handle_delete(struct peerd *peer, struct peer_req *pr)
234 //struct radosd *rados = (struct radosd *) peer->priv;
235 struct rados_io *rio = (struct rados_io *) pr->priv;
237 if (rio->state == ACCEPTED) {
238 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
239 rio->state = PENDING;
240 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
242 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
248 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
252 XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
259 int handle_info(struct peerd *peer, struct peer_req *pr)
262 struct xseg_request *req = pr->req;
263 //struct radosd *rados = (struct radosd *) peer->priv;
264 struct rados_io *rio = (struct rados_io *) pr->priv;
265 char *req_data = xseg_get_data(peer->xseg, req);
266 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
268 if (rio->state == ACCEPTED) {
269 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
270 rio->state = PENDING;
271 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
273 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
280 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
284 xinfo->size = rio->size;
285 pr->retval = sizeof(uint64_t);
286 XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);
293 int handle_read(struct peerd *peer, struct peer_req *pr)
295 struct rados_io *rio = (struct rados_io *) (pr->priv);
296 struct xseg_request *req = pr->req;
298 if (rio->state == ACCEPTED) {
303 rio->state = READING;
304 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
305 if (do_aio_read(peer, pr) < 0) {
306 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
311 else if (rio->state == READING) {
312 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
313 data = xseg_get_data(peer->xseg, pr->req);
315 req->serviced += pr->retval;
316 else if (pr->retval == 0) {
317 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
318 "%llu bytes. Zeroing out rest", rio->obj_name,
319 (unsigned long long) req->serviced);
320 /* reached end of object. zero out rest of data
321 * requested from this object
323 memset(data + req->serviced, 0, req->datalen - req->serviced);
324 req->serviced = req->datalen;
326 else if (pr->retval == -2) {
327 XSEGLOG2(&lc, I, "Reading of %s return -2. "
328 "Zeroing out data", rio->obj_name);
329 /* object not found. return zeros instead */
330 memset(data, 0, req->datalen);
331 req->serviced = req->datalen;
334 XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
335 /* pr->retval < 0 && pr->retval != -2 */
339 if (req->serviced >= req->datalen) {
340 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
346 /* should not happen */
351 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
352 if (do_aio_read(peer, pr) < 0) {
353 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
359 /* should not reach this */
360 printf("read request reached this\n");
366 int handle_write(struct peerd *peer, struct peer_req *pr)
368 struct rados_io *rio = (struct rados_io *) (pr->priv);
369 struct xseg_request *req = pr->req;
370 if (rio->state == ACCEPTED) {
373 if (req->flags & XF_FLUSH) {
382 //should we ensure req->op = X_READ ?
383 rio->state = WRITING;
384 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
385 if (do_aio_write(peer, pr) < 0) {
386 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
391 else if (rio->state == WRITING) {
392 /* rados writes return 0 if write succeeded or < 0 if failed
393 * no resubmission occurs
395 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
396 if (pr->retval == 0) {
397 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
398 req->serviced = req->datalen;
403 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
409 /* should not reach this */
410 printf("write request reached this\n");
416 int handle_copy(struct peerd *peer, struct peer_req *pr)
418 //struct radosd *rados = (struct radosd *) peer->priv;
419 struct xseg_request *req = pr->req;
420 struct rados_io *rio = (struct rados_io *) pr->priv;
422 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
424 if (rio->state == ACCEPTED){
425 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
426 rio->second_name, rio->obj_name);
428 complete(peer, pr); //or fail?
432 rio->second_name = malloc(MAX_OBJ_NAME + 1);
433 if (!rio->second_name){
437 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
438 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
439 strncpy(rio->second_name, xcopy->target, end);
440 rio->second_name[end] = 0;
442 rio->buf = malloc(req->size);
448 rio->state = READING;
450 XSEGLOG2(&lc, I, "Reading %s", rio->second_name);
451 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
452 req->size - rio->read, req->offset + rio->read) < 0) {
453 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
459 else if (rio->state == READING){
460 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
462 rio->read += pr->retval;
463 else if (pr->retval == 0) {
464 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
465 "%llu bytes. Zeroing out rest", rio->obj_name,
466 (unsigned long long) req->serviced);
467 memset(rio->buf + rio->read, 0, req->size - rio->read);
468 rio->read = req->size ;
471 XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
476 if (rio->read >= req->size) {
477 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
479 rio->state = WRITING;
480 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
481 if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
482 rio->buf, req->size, req->offset) < 0) {
483 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
490 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
491 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
492 req->size - rio->read, req->offset + rio->read) < 0) {
493 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
499 else if (rio->state == WRITING){
500 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
501 if (pr->retval == 0) {
502 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
503 XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->second_name, rio->obj_name);
504 req->serviced = req->size;
509 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
510 XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->second_name, rio->obj_name);
516 XSEGLOG2(&lc, E, "Unknown state");
524 free(rio->second_name);
527 rio->second_name = NULL;
537 int handle_snapshot(struct peerd *peer, struct peer_req *pr)
539 //struct radosd *rados = (struct radosd *) peer->priv;
540 struct xseg_request *req = pr->req;
541 struct rados_io *rio = (struct rados_io *) pr->priv;
544 if (rio->state == ACCEPTED){
545 struct xseg_request_snapshot *xsnapshot;
546 xsnapshot = (struct xseg_request_snapshot *)xseg_get_data(peer->xseg, req);
547 (void)xsnapshot; //ignore it
548 XSEGLOG2(&lc, I, "Starting snapshot of object %s", rio->obj_name);
550 complete(peer, pr); //or fail?
554 rio->second_name = malloc(MAX_OBJ_NAME+1);
555 if (!rio->second_name){
558 rio->buf = malloc(req->size);
563 rio->state = READING;
565 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
566 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
567 req->size - rio->read, req->offset + rio->read) < 0) {
568 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
574 else if (rio->state == READING){
575 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
577 rio->read += pr->retval;
579 XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
584 if (!pr->retval || rio->read >= req->size) {
585 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
586 //rstrip here in case zeros were written in the end
587 uint64_t trailing_zeros = 0;
588 for (;trailing_zeros < rio->read; trailing_zeros++)
589 if (rio->buf[rio->read-trailing_zeros -1])
591 XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
592 rio->read, trailing_zeros);
594 rio->read -= trailing_zeros;
595 //calculate snapshot name
596 unsigned char sha[SHA256_DIGEST_SIZE];
597 SHA256((unsigned char *) rio->buf, rio->read, sha);
598 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
599 sizeof(struct xseg_reply_snapshot));
601 struct xseg_reply_snapshot *xreply;
602 xreply = (struct xseg_reply_snapshot *)xseg_get_data(peer->xseg, req);
603 hexlify(sha, xreply->target);
604 xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
606 strncpy(rio->second_name, xreply->target, xreply->targetlen);
607 rio->second_name[xreply->targetlen] = 0;
608 XSEGLOG2(&lc, I, "Calculated %s as snapshot of %s",
609 rio->second_name, rio->obj_name);
612 rio->state = STATING;
613 r = do_aio_generic(peer, pr, X_INFO, rio->second_name, NULL, 0, 0);
615 XSEGLOG2(&lc, E, "Stating %s failed", rio->second_name_name);
621 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
622 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
623 req->size - rio->read, req->offset + rio->read) < 0) {
624 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
630 } else if (rio->state == STATING){
633 XSEGLOG2(&lc, I, "Stating %s failed. Writing.",
635 rio->state = WRITING;
636 if (do_aio_generic(peer, pr, X_WRITE, rio->second_name,
637 rio->buf, rio->read, 0) < 0) {
638 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->second_name);
645 XSEGLOG2(&lc, I, "Stating %s completed Successfully."
646 "No need to write.", rio->second_name);
652 else if (rio->state == WRITING){
653 struct xseg_reply_snapshot *xreply;
654 xreply = (struct xseg_reply_snapshot *)xseg_get_data(peer->xseg, req);
656 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
657 if (pr->retval == 0) {
658 XSEGLOG2(&lc, I, "Writing of %s completed", rio->second_name);
659 XSEGLOG2(&lc, I, "Snapshot of object %s to object %s completed",
660 rio->obj_name, rio->second_name);
661 req->serviced = req->size;
666 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
667 XSEGLOG2(&lc, E, "Snapshot of object %s failed",
674 XSEGLOG2(&lc, E, "Unknown state");
682 free(rio->second_name);
685 rio->second_name = NULL;
695 int spawnthread(struct peerd *peer, struct peer_req *pr,
696 void *(*func)(void *arg))
698 //struct radosd *rados = (struct radosd *) peer->priv;
699 struct rados_io *rio = (struct rados_io *) (pr->priv);
702 pthread_attr_init(&attr);
703 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
705 return (pthread_create(&rio->tid, &attr, func, (void *) pr));
708 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
711 struct peer_req *pr = (struct peer_req *)arg;
712 //struct radosd *rados = (struct radosd *) pr->peer->priv;
713 struct rados_io *rio = (struct rados_io *) (pr->priv);
715 if (pr->req->op == X_ACQUIRE){
716 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
717 pthread_cond_signal(&rio->cond);
720 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
723 void * lock_op(void *arg)
725 struct peer_req *pr = (struct peer_req *)arg;
726 struct radosd *rados = (struct radosd *) pr->peer->priv;
727 struct rados_io *rio = (struct rados_io *) (pr->priv);
728 uint32_t len = strlen(rio->obj_name);
729 strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
730 rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
732 XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
733 if (!(pr->req->flags & XF_NOSYNC)){
734 if (rados_watch(rados->ioctx, rio->obj_name, 0,
735 &rio->watch_handle, watch_cb, pr) < 0){
736 XSEGLOG2(&lc, E, "Rados watch failed for %s",
743 while(rados_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
744 C_LOCK_EXCLUSIVE, RADOS_LOCK_COOKIE, "", "", NULL, 0) < 0){
745 if (pr->req->flags & XF_NOSYNC){
746 XSEGLOG2(&lc, E, "Rados lock failed for %s",
752 XSEGLOG2(&lc, D, "rados lock for %s sleeping",
754 pthread_mutex_lock(&rio->m);
755 pthread_cond_wait(&rio->cond, &rio->m);
756 pthread_mutex_unlock(&rio->m);
757 XSEGLOG2(&lc, D, "rados lock for %s woke up",
761 if (!(pr->req->flags & XF_NOSYNC)){
762 if (rados_unwatch(rados->ioctx, rio->obj_name,
763 rio->watch_handle) < 0){
764 XSEGLOG2(&lc, E, "Rados unwatch failed");
767 XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
768 complete(pr->peer, pr);
772 void * unlock_op(void *arg)
774 struct peer_req *pr = (struct peer_req *)arg;
775 struct radosd *rados = (struct radosd *) pr->peer->priv;
776 struct rados_io *rio = (struct rados_io *) (pr->priv);
777 uint32_t len = strlen(rio->obj_name);
778 strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
779 rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
781 XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
782 if (pr->req->flags & XF_FORCE)
783 r = rados_break_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
786 r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
789 XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
793 if (rados_notify(rados->ioctx, rio->obj_name,
795 XSEGLOG2(&lc, E, "rados notify failed");
797 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
798 complete(pr->peer, pr);
803 int handle_acquire(struct peerd *peer, struct peer_req *pr)
805 int r = spawnthread(peer, pr, lock_op);
812 int handle_release(struct peerd *peer, struct peer_req *pr)
814 int r = spawnthread(peer, pr, unlock_op);
820 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
823 struct radosd *rados = malloc(sizeof(struct radosd));
824 struct rados_io *rio;
831 BEGIN_READ_ARGS(argc, argv);
832 READ_ARG_STRING("--pool", rados->pool, MAX_POOL_NAME);
835 if (!rados->pool[0]){
836 XSEGLOG2(&lc, E , "Pool must be provided");
842 if (rados_create(&rados->cluster, NULL) < 0) {
843 XSEGLOG2(&lc, E, "Rados create failed!");
846 if (rados_conf_read_file(rados->cluster, NULL) < 0){
847 XSEGLOG2(&lc, E, "Error reading rados conf files!");
850 if (rados_connect(rados->cluster) < 0) {
851 XSEGLOG2(&lc, E, "Rados connect failed!");
852 rados_shutdown(rados->cluster);
856 if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
857 XSEGLOG2(&lc, E, "Pool does not exists. Try creating it first");
858 rados_shutdown(rados->cluster);
862 if (rados_pool_create(rados->cluster, rados->pool) < 0){
863 XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
864 rados_shutdown(rados->cluster);
868 XSEGLOG2(&lc, I, "Pool created.");
872 if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
873 XSEGLOG2(&lc, E, "ioctx create problem.");
874 rados_shutdown(rados->cluster);
878 peer->priv = (void *) rados;
879 for (i = 0; i < peer->nr_ops; i++) {
880 rio = malloc(sizeof(struct rados_io));
883 //is this really necessary?
884 for (j = 0; j < i; j++) {
885 free(peer->peer_reqs[j].priv);
894 rio->second_name = 0;
895 rio->watch_handle = 0;
896 pthread_cond_init(&rio->cond, NULL);
897 pthread_mutex_init(&rio->m, NULL);
898 peer->peer_reqs[i].priv = (void *) rio;
903 // nothing to do here for now
904 int custom_arg_parse(int argc, const char *argv[])
909 void custom_peer_finalize(struct peerd *peer)
914 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
915 enum dispatch_reason reason)
917 struct rados_io *rio = (struct rados_io *) (pr->priv);
918 char *target = xseg_get_target(peer->xseg, pr->req);
919 unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
920 strncpy(rio->obj_name, target, end);
921 rio->obj_name[end] = 0;
922 //log_pr("dispatch", pr);
923 if (reason == dispatch_accept)
924 rio->state = ACCEPTED;
926 switch (pr->req->op){
928 handle_read(peer, pr); break;
930 handle_write(peer, pr); break;
933 defer_request(peer, pr);
935 handle_delete(peer, pr);
939 defer_request(peer, pr);
941 handle_info(peer, pr);
945 defer_request(peer, pr);
947 handle_copy(peer, pr);
950 handle_acquire(peer, pr); break;
952 handle_release(peer, pr); break;
954 handle_snapshot(peer, pr); break;