2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <xseg/xseg.h>
40 #include <rados/librados.h>
41 #include <xseg/protocol.h>
43 #include <openssl/sha.h>
49 #define LOCK_SUFFIX "_lock"
50 #define LOCK_SUFFIX_LEN 5
51 #define HASH_SUFFIX "_hash"
52 #define HASH_SUFFIX_LEN 5
54 #define MAX_POOL_NAME 64
55 #define MAX_OBJ_NAME (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + 1)
56 #define RADOS_LOCK_NAME "RadosLock"
57 //#define RADOS_LOCK_COOKIE "Cookie"
58 #define RADOS_LOCK_COOKIE "foo"
59 #define RADOS_LOCK_TAG ""
60 #define RADOS_LOCK_DESC ""
62 void custom_peer_usage()
64 fprintf(stderr, "Custom peer options:\n"
65 "--pool: Rados pool to connect\n"
82 char pool[MAX_POOL_NAME + 1];
86 char obj_name[MAX_OBJ_NAME + 1];
87 enum rados_state state;
89 char *second_name, *buf;
91 uint64_t watch_handle;
97 void rados_ack_cb(rados_completion_t c, void *arg)
99 struct peer_req *pr = (struct peer_req*) arg;
100 struct peerd *peer = pr->peer;
101 int ret = rados_aio_get_return_value(c);
103 rados_aio_release(c);
104 dispatch(peer, pr, pr->req, dispatch_internal);
107 void rados_commit_cb(rados_completion_t c, void *arg)
109 struct peer_req *pr = (struct peer_req*) arg;
110 struct peerd *peer = pr->peer;
111 int ret = rados_aio_get_return_value(c);
113 rados_aio_release(c);
114 dispatch(peer, pr, pr->req, dispatch_internal);
117 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
118 char *target, char *buf, uint64_t size, uint64_t offset)
120 struct radosd *rados = (struct radosd *) peer->priv;
121 struct rados_io *rio = (struct rados_io *) pr->priv;
124 rados_completion_t rados_compl;
127 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
130 r = rados_aio_read(rados->ioctx, target, rados_compl,
134 r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
137 r = rados_aio_write(rados->ioctx, target, rados_compl,
141 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
144 r = rados_aio_remove(rados->ioctx, target, rados_compl);
147 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
150 r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL);
157 rados_aio_release(rados_compl);
162 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
164 struct xseg_request *req = pr->req;
165 struct rados_io *rio = (struct rados_io *) pr->priv;
166 char *data = xseg_get_data(peer->xseg, pr->req);
168 return do_aio_generic(peer, pr, X_READ, rio->obj_name,
169 data + req->serviced,
170 req->size - req->serviced,
171 req->offset + req->serviced);
174 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
176 struct xseg_request *req = pr->req;
177 struct rados_io *rio = (struct rados_io *) pr->priv;
178 char *data = xseg_get_data(peer->xseg, pr->req);
180 return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
181 data + req->serviced,
182 req->size - req->serviced,
183 req->offset + req->serviced);
186 int handle_delete(struct peerd *peer, struct peer_req *pr)
189 //struct radosd *rados = (struct radosd *) peer->priv;
190 struct rados_io *rio = (struct rados_io *) pr->priv;
192 if (rio->state == ACCEPTED) {
193 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
194 rio->state = PENDING;
195 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
197 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
203 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
207 XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
214 int handle_info(struct peerd *peer, struct peer_req *pr)
217 struct xseg_request *req = pr->req;
218 //struct radosd *rados = (struct radosd *) peer->priv;
219 struct rados_io *rio = (struct rados_io *) pr->priv;
221 struct xseg_reply_info *xinfo;
222 char buf[XSEG_MAX_TARGETLEN + 1];
225 if (rio->state == ACCEPTED) {
226 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
227 rio->state = PENDING;
228 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
230 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
235 if (req->datalen < sizeof(struct xseg_reply_info)) {
236 target = xseg_get_target(peer->xseg, req);
237 strncpy(buf, target, req->targetlen);
238 r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
240 XSEGLOG2(&lc, E, "Cannot resize request");
244 target = xseg_get_target(peer->xseg, req);
245 strncpy(target, buf, req->targetlen);
248 req_data = xseg_get_data(peer->xseg, req);
249 xinfo = (struct xseg_reply_info *)req_data;
252 XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);
256 xinfo->size = rio->size;
257 pr->retval = sizeof(uint64_t);
258 XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);
265 int handle_read(struct peerd *peer, struct peer_req *pr)
267 struct rados_io *rio = (struct rados_io *) (pr->priv);
268 struct xseg_request *req = pr->req;
271 if (req->datalen < req->size) {
272 XSEGLOG2(&lc, E, "Request datalen is less than req size");
276 if (rio->state == ACCEPTED) {
281 rio->state = READING;
282 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
283 if (do_aio_read(peer, pr) < 0) {
284 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
289 else if (rio->state == READING) {
290 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
291 data = xseg_get_data(peer->xseg, pr->req);
293 req->serviced += pr->retval;
294 else if (pr->retval == 0) {
295 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
296 "%llu bytes. Zeroing out rest", rio->obj_name,
297 (unsigned long long) req->serviced);
298 /* reached end of object. zero out rest of data
299 * requested from this object
301 memset(data + req->serviced, 0, req->size - req->serviced);
302 req->serviced = req->size;
304 else if (pr->retval == -2) {
305 XSEGLOG2(&lc, I, "Reading of %s return -2. "
306 "Zeroing out data", rio->obj_name);
307 /* object not found. return zeros instead */
308 memset(data, 0, req->size);
309 req->serviced = req->size;
312 XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
313 /* pr->retval < 0 && pr->retval != -2 */
317 if (req->serviced >= req->size) {
318 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
324 /* should not happen */
329 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
330 if (do_aio_read(peer, pr) < 0) {
331 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
337 /* should not reach this */
338 printf("read request reached this\n");
344 int handle_write(struct peerd *peer, struct peer_req *pr)
346 if (pr->req->datalen < pr->req->size) {
347 XSEGLOG2(&lc, E, "Request datalen is less than req size");
351 struct rados_io *rio = (struct rados_io *) (pr->priv);
352 struct xseg_request *req = pr->req;
353 if (rio->state == ACCEPTED) {
356 if (req->flags & XF_FLUSH) {
365 //should we ensure req->op = X_READ ?
366 rio->state = WRITING;
367 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
368 if (do_aio_write(peer, pr) < 0) {
369 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
374 else if (rio->state == WRITING) {
375 /* rados writes return 0 if write succeeded or < 0 if failed
376 * no resubmission occurs
378 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
379 if (pr->retval == 0) {
380 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
381 req->serviced = req->size;
386 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
392 /* should not reach this */
393 printf("write request reached this\n");
399 int handle_copy(struct peerd *peer, struct peer_req *pr)
401 //struct radosd *rados = (struct radosd *) peer->priv;
402 struct xseg_request *req = pr->req;
403 struct rados_io *rio = (struct rados_io *) pr->priv;
405 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
407 if (rio->state == ACCEPTED){
408 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
409 rio->second_name, rio->obj_name);
411 complete(peer, pr); //or fail?
415 rio->second_name = malloc(MAX_OBJ_NAME + 1);
416 if (!rio->second_name){
420 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
421 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
422 strncpy(rio->second_name, xcopy->target, end);
423 rio->second_name[end] = 0;
425 rio->buf = malloc(req->size);
431 rio->state = READING;
433 XSEGLOG2(&lc, I, "Reading %s", rio->second_name);
434 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
435 req->size - rio->read, req->offset + rio->read) < 0) {
436 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
442 else if (rio->state == READING){
443 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
445 rio->read += pr->retval;
446 else if (pr->retval == 0) {
447 XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
448 "%llu bytes. Zeroing out rest", rio->obj_name,
449 (unsigned long long) req->serviced);
450 memset(rio->buf + rio->read, 0, req->size - rio->read);
451 rio->read = req->size ;
454 XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
459 if (rio->read >= req->size) {
460 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
462 rio->state = WRITING;
463 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
464 if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
465 rio->buf, req->size, req->offset) < 0) {
466 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
473 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
474 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
475 req->size - rio->read, req->offset + rio->read) < 0) {
476 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
482 else if (rio->state == WRITING){
483 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
484 if (pr->retval == 0) {
485 XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
486 XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->second_name, rio->obj_name);
487 req->serviced = req->size;
492 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
493 XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->second_name, rio->obj_name);
499 XSEGLOG2(&lc, E, "Unknown state");
507 free(rio->second_name);
510 rio->second_name = NULL;
520 int handle_hash(struct peerd *peer, struct peer_req *pr)
522 //struct radosd *rados = (struct radosd *) peer->priv;
523 struct xseg_request *req = pr->req;
524 struct rados_io *rio = (struct rados_io *) pr->priv;
525 uint64_t trailing_zeros = 0;
526 unsigned char sha[SHA256_DIGEST_SIZE];
527 struct xseg_reply_hash *xreply;
529 char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
532 if (rio->state == ACCEPTED){
533 XSEGLOG2(&lc, I, "Starting hashing of object %s", rio->obj_name);
535 fail(peer, pr); //or fail?
539 rio->second_name = malloc(MAX_OBJ_NAME+1);
540 if (!rio->second_name){
543 rio->buf = malloc(req->size);
549 rio->second_name[0] = 0;
550 rio->state = PREHASHING;
552 strncpy(hash_name, rio->obj_name, strlen(rio->obj_name));
553 pos += strlen(rio->obj_name);
554 strncpy(hash_name+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
555 pos += HASH_SUFFIX_LEN;
558 if (do_aio_generic(peer, pr, X_READ, hash_name, rio->second_name,
559 HEXLIFIED_SHA256_DIGEST_SIZE, 0) < 0) {
560 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
565 } else if (rio->state == PREHASHING) {
566 if (rio->second_name[0] != 0) {
567 XSEGLOG2(&lc, D, "Precalculated hash found");
568 xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
569 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
570 sizeof(struct xseg_reply_hash));
571 strncpy(xreply->target, rio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE);
572 xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
574 XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
575 rio->second_name, rio->obj_name);
576 req->serviced = req->size;
580 rio->state = READING;
582 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
583 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
584 req->size - rio->read, req->offset + rio->read) < 0) {
585 XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
590 } else if (rio->state == READING){
591 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
593 rio->read += pr->retval;
595 XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
600 if (!pr->retval || rio->read >= req->size) {
601 XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
602 //rstrip here in case zeros were written in the end
603 for (;trailing_zeros < rio->read; trailing_zeros++)
604 if (rio->buf[rio->read-trailing_zeros -1])
606 XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
607 rio->read, trailing_zeros);
609 rio->read -= trailing_zeros;
610 SHA256((unsigned char *) rio->buf, rio->read, sha);
611 hexlify(sha, SHA256_DIGEST_SIZE, rio->second_name);
612 rio->second_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
614 xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
615 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
616 sizeof(struct xseg_reply_hash));
617 strncpy(xreply->target, rio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE);
618 xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
620 XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
621 rio->second_name, rio->obj_name);
625 rio->state = STATING;
626 r = do_aio_generic(peer, pr, X_INFO, rio->second_name, NULL, 0, 0);
628 XSEGLOG2(&lc, E, "Stating %s failed", rio->second_name);
634 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
635 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
636 req->size - rio->read, req->offset + rio->read) < 0) {
637 XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
643 } else if (rio->state == STATING){
646 XSEGLOG2(&lc, I, "Stating %s failed. Writing.",
648 rio->state = WRITING;
649 if (do_aio_generic(peer, pr, X_WRITE, rio->second_name,
650 rio->buf, rio->read, 0) < 0) {
651 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->second_name);
658 XSEGLOG2(&lc, I, "Stating %s completed Successfully."
659 "No need to write.", rio->second_name);
660 XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
661 rio->obj_name, rio->second_name);
662 req->serviced = req->size;
668 else if (rio->state == WRITING){
669 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
670 if (pr->retval == 0) {
671 XSEGLOG2(&lc, I, "Writing of %s completed", rio->second_name);
672 XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
673 rio->obj_name, rio->second_name);
676 strncpy(hash_name, rio->obj_name, strlen(rio->obj_name));
677 pos += strlen(rio->obj_name);
678 strncpy(hash_name+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
679 pos += HASH_SUFFIX_LEN;
682 rio->state = POSTHASHING;
683 if (do_aio_generic(peer, pr, X_WRITE, hash_name, rio->second_name,
684 HEXLIFIED_SHA256_DIGEST_SIZE, 0) < 0) {
685 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", hash_name);
692 XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
693 XSEGLOG2(&lc, E, "Hash of object %s failed",
698 } else if (rio->state == POSTHASHING) {
699 XSEGLOG2(&lc, I, "Writing of prehashed value callback");
700 if (pr->retval == 0) {
701 XSEGLOG2(&lc, I, "Writing of prehashed value completed");
702 XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
703 rio->obj_name, rio->second_name);
707 XSEGLOG2(&lc, E, "Writing of prehash failed");
709 req->serviced = req->size;
715 XSEGLOG2(&lc, E, "Unknown state");
723 free(rio->second_name);
726 rio->second_name = NULL;
736 int spawnthread(struct peerd *peer, struct peer_req *pr,
737 void *(*func)(void *arg))
739 //struct radosd *rados = (struct radosd *) peer->priv;
740 struct rados_io *rio = (struct rados_io *) (pr->priv);
743 pthread_attr_init(&attr);
744 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
746 return (pthread_create(&rio->tid, &attr, func, (void *) pr));
749 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
752 struct peer_req *pr = (struct peer_req *)arg;
753 //struct radosd *rados = (struct radosd *) pr->peer->priv;
754 struct rados_io *rio = (struct rados_io *) (pr->priv);
756 if (pr->req->op == X_ACQUIRE){
757 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
758 pthread_cond_signal(&rio->cond);
761 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
764 void * lock_op(void *arg)
766 struct peer_req *pr = (struct peer_req *)arg;
767 struct radosd *rados = (struct radosd *) pr->peer->priv;
768 struct rados_io *rio = (struct rados_io *) (pr->priv);
769 uint32_t len = strlen(rio->obj_name);
770 strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
771 rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
773 XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
774 if (!(pr->req->flags & XF_NOSYNC)){
775 if (rados_watch(rados->ioctx, rio->obj_name, 0,
776 &rio->watch_handle, watch_cb, pr) < 0){
777 XSEGLOG2(&lc, E, "Rados watch failed for %s",
784 /* passing flag 1 means renew lock */
785 while(rados_lock_exclusive(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
786 RADOS_LOCK_COOKIE, RADOS_LOCK_DESC, NULL, LIBRADOS_LOCK_FLAG_RENEW) < 0){
787 if (pr->req->flags & XF_NOSYNC){
788 XSEGLOG2(&lc, E, "Rados lock failed for %s",
794 XSEGLOG2(&lc, D, "rados lock for %s sleeping",
796 pthread_mutex_lock(&rio->m);
797 pthread_cond_wait(&rio->cond, &rio->m);
798 pthread_mutex_unlock(&rio->m);
799 XSEGLOG2(&lc, D, "rados lock for %s woke up",
803 if (!(pr->req->flags & XF_NOSYNC)){
804 if (rados_unwatch(rados->ioctx, rio->obj_name,
805 rio->watch_handle) < 0){
806 XSEGLOG2(&lc, E, "Rados unwatch failed");
809 XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
810 complete(pr->peer, pr);
814 int break_lock(struct radosd *rados, struct rados_io *rio)
817 char *tag = NULL, *clients = NULL, *cookies = NULL, *addrs = NULL;
818 size_t tag_len = 1024, clients_len = 1024, cookies_len = 1024;
819 size_t addrs_len = 1024;
823 tag = malloc(sizeof(char) * tag_len);
824 clients = malloc(sizeof(char) * clients_len);
825 cookies = malloc(sizeof(char) * cookies_len);
826 addrs = malloc(sizeof(char) * addrs_len);
827 if (!tag || !clients || !cookies || !addrs) {
828 XSEGLOG2(&lc, E, "Out of memmory");
833 nr_lockers = rados_list_lockers(rados->ioctx, rio->obj_name,
834 RADOS_LOCK_NAME, &exclusive, tag, &tag_len,
835 clients, &clients_len, cookies, &cookies_len,
837 if (nr_lockers < 0 && nr_lockers != -ERANGE) {
838 XSEGLOG2(&lc, E, "Could not list lockers for %s", rio->obj_name);
841 } else if (nr_lockers == -ERANGE) {
847 if (nr_lockers != 1) {
848 XSEGLOG2(&lc, E, "Number of lockers for %s != 1 !(%d)",
849 rio->obj_name, nr_lockers);
852 } else if (!exclusive) {
853 XSEGLOG2(&lc, E, "Lock for %s is not exclusive",
857 } else if (strcmp(RADOS_LOCK_TAG, tag)) {
858 XSEGLOG2(&lc, E, "List lockers returned wrong tag "
859 "(\"%s\" vs \"%s\")", tag, RADOS_LOCK_TAG);
863 r = rados_break_lock(rados->ioctx, rio->obj_name,
864 RADOS_LOCK_NAME, clients, RADOS_LOCK_COOKIE);
877 void * unlock_op(void *arg)
879 struct peer_req *pr = (struct peer_req *)arg;
880 struct radosd *rados = (struct radosd *) pr->peer->priv;
881 struct rados_io *rio = (struct rados_io *) (pr->priv);
882 uint32_t len = strlen(rio->obj_name);
883 strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
884 rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
887 XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
888 if (pr->req->flags & XF_FORCE) {
889 r = break_lock(rados, rio);
892 r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
895 /* ENOENT means that the lock did not existed.
896 * This still counts as a successfull unlock operation
898 //if (r < 0 && r != -ENOENT){
900 XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
904 if (rados_notify(rados->ioctx, rio->obj_name,
906 XSEGLOG2(&lc, E, "rados notify failed");
908 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
909 complete(pr->peer, pr);
914 int handle_acquire(struct peerd *peer, struct peer_req *pr)
916 int r = spawnthread(peer, pr, lock_op);
923 int handle_release(struct peerd *peer, struct peer_req *pr)
925 int r = spawnthread(peer, pr, unlock_op);
931 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
934 struct radosd *rados = malloc(sizeof(struct radosd));
935 struct rados_io *rio;
942 BEGIN_READ_ARGS(argc, argv);
943 READ_ARG_STRING("--pool", rados->pool, MAX_POOL_NAME);
946 if (!rados->pool[0]){
947 XSEGLOG2(&lc, E , "Pool must be provided");
953 if (rados_create(&rados->cluster, NULL) < 0) {
954 XSEGLOG2(&lc, E, "Rados create failed!");
957 if (rados_conf_read_file(rados->cluster, NULL) < 0){
958 XSEGLOG2(&lc, E, "Error reading rados conf files!");
961 if (rados_connect(rados->cluster) < 0) {
962 XSEGLOG2(&lc, E, "Rados connect failed!");
963 rados_shutdown(rados->cluster);
967 if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
968 XSEGLOG2(&lc, E, "Pool does not exists. Try creating it first");
969 rados_shutdown(rados->cluster);
973 if (rados_pool_create(rados->cluster, rados->pool) < 0){
974 XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
975 rados_shutdown(rados->cluster);
979 XSEGLOG2(&lc, I, "Pool created.");
983 if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
984 XSEGLOG2(&lc, E, "ioctx create problem.");
985 rados_shutdown(rados->cluster);
989 peer->priv = (void *) rados;
990 for (i = 0; i < peer->nr_ops; i++) {
991 rio = malloc(sizeof(struct rados_io));
994 //is this really necessary?
995 for (j = 0; j < i; j++) {
996 free(peer->peer_reqs[j].priv);
1005 rio->second_name = 0;
1006 rio->watch_handle = 0;
1007 pthread_cond_init(&rio->cond, NULL);
1008 pthread_mutex_init(&rio->m, NULL);
1009 peer->peer_reqs[i].priv = (void *) rio;
1014 // nothing to do here for now
1015 int custom_arg_parse(int argc, const char *argv[])
1020 void custom_peer_finalize(struct peerd *peer)
1025 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1026 enum dispatch_reason reason)
1028 struct rados_io *rio = (struct rados_io *) (pr->priv);
1029 char *target = xseg_get_target(peer->xseg, pr->req);
1030 unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ?
1031 MAX_OBJ_NAME : pr->req->targetlen;
1033 if (reason == dispatch_accept) {
1034 strncpy(rio->obj_name, target, end);
1035 rio->obj_name[end] = 0;
1036 rio->state = ACCEPTED;
1040 switch (pr->req->op){
1042 handle_read(peer, pr); break;
1044 handle_write(peer, pr); break;
1047 defer_request(peer, pr);
1049 handle_delete(peer, pr);
1053 defer_request(peer, pr);
1055 handle_info(peer, pr);
1059 defer_request(peer, pr);
1061 handle_copy(peer, pr);
1064 handle_acquire(peer, pr); break;
1066 handle_release(peer, pr); break;
1068 handle_hash(peer, pr); break;