Fix licence headers in multiple files
[archipelago] / xseg / peers / user / mt-sosd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <xseg/xseg.h>
39 #include <peer.h>
40 #include <rados/librados.h>
41 #include <xseg/protocol.h>
42 #include <pthread.h>
43 #include <openssl/sha.h>
44 #include <ctype.h>
45 #include <errno.h>
46
47 #ifndef SHA256_DIGEST_SIZE
48 #define SHA256_DIGEST_SIZE 32
49 #endif
50 /* hex representation of sha256 value takes up double the sha256 size */
51 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
52
53
54 #define LOCK_SUFFIX "_lock"
55 #define LOCK_SUFFIX_LEN 5
56
57 #define MAX_POOL_NAME 64
58 #define MAX_OBJ_NAME (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + 1)
59 #define RADOS_LOCK_NAME "RadosLock"
60 //#define RADOS_LOCK_COOKIE "Cookie"
61 #define RADOS_LOCK_COOKIE "foo"
62 #define RADOS_LOCK_TAG ""
63 #define RADOS_LOCK_DESC ""
64
65 void custom_peer_usage()
66 {
67         fprintf(stderr, "Custom peer options:\n"
68                 "--pool: Rados pool to connect\n"
69                 "\n");
70 }
71
72 /* hexlify function. 
73  * Unsafe. Doesn't check if data length is odd!
74  */
75 static void hexlify(unsigned char *data, char *hex)
76 {
77         int i;
78         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
79                 sprintf(hex+2*i, "%02x", data[i]);
80 }
81 /*
82 static void unhexlify(char *hex, unsigned char *data)
83 {
84         int i;
85         char c;
86         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
87                 data[i] = 0;
88                 c = hex[2*i];
89                 if (isxdigit(c)){
90                         if (isdigit(c)){
91                                 c-= '0';
92                         }
93                         else {
94                                 c = tolower(c);
95                                 c = c-'a' + 10;
96                         }
97                 }
98                 else {
99                         c = 0;
100                 }
101                 data[i] |= (c << 4) & 0xF0;
102                 c = hex[2*i+1];
103                 if (isxdigit(c)){
104                         if (isdigit(c)){
105                                 c-= '0';
106                         }
107                         else {
108                                 c = tolower(c);
109                                 c = c-'a' + 10;
110                         }
111                 }
112                 else {
113                         c = 0;
114                 }
115                 data[i] |= c & 0x0F;
116         }
117 }
118 */
119
120 enum rados_state {
121         ACCEPTED = 0,
122         PENDING = 1,
123         READING = 2,
124         WRITING = 3,
125         STATING = 4
126 };
127
128 struct radosd {
129         rados_t cluster;
130         rados_ioctx_t ioctx;
131         char pool[MAX_POOL_NAME + 1];
132 };
133
134 struct rados_io{
135         char obj_name[MAX_OBJ_NAME + 1];
136         enum rados_state state;
137         uint64_t size;
138         char *second_name, *buf;
139         uint64_t read;
140         uint64_t watch_handle;
141         pthread_t tid;
142         pthread_cond_t cond;
143         pthread_mutex_t m;
144 };
145
146 void rados_ack_cb(rados_completion_t c, void *arg)
147 {
148         struct peer_req *pr = (struct peer_req*) arg;
149         struct peerd *peer = pr->peer;
150         int ret = rados_aio_get_return_value(c);
151         pr->retval = ret;
152         rados_aio_release(c);
153         dispatch(peer, pr, pr->req, dispatch_internal);
154 }
155
156 void rados_commit_cb(rados_completion_t c, void *arg)
157 {
158         struct peer_req *pr = (struct peer_req*) arg;
159         struct peerd *peer = pr->peer;
160         int ret = rados_aio_get_return_value(c);
161         pr->retval = ret;
162         rados_aio_release(c);
163         dispatch(peer, pr, pr->req, dispatch_internal);
164 }
165
166 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
167                 char *target, char *buf, uint64_t size, uint64_t offset)
168 {
169         struct radosd *rados = (struct radosd *) peer->priv;
170         struct rados_io *rio = (struct rados_io *) pr->priv;
171         int r;
172
173         rados_completion_t rados_compl;
174         switch (op) {
175                 case X_READ:
176                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
177                         if (r < 0)
178                                 return -1;
179                         r = rados_aio_read(rados->ioctx, target, rados_compl,
180                                         buf, size, offset);
181                         break;
182                 case X_WRITE:
183                         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
184                         if (r < 0)
185                                 return -1;
186                         r = rados_aio_write(rados->ioctx, target, rados_compl,
187                                         buf, size, offset);
188                         break;
189                 case X_DELETE:
190                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
191                         if (r < 0)
192                                 return -1;
193                         r = rados_aio_remove(rados->ioctx, target, rados_compl);
194                         break;
195                 case X_INFO:
196                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
197                         if (r < 0)
198                                 return -1;
199                         r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
200                         break;
201                 default:
202                         return -1;
203                         break;
204         }
205         if (r < 0) {
206                 rados_aio_release(rados_compl);
207         }
208         return r;
209 }
210
211 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
212 {
213         struct xseg_request *req = pr->req;
214         struct rados_io *rio = (struct rados_io *) pr->priv;
215         char *data = xseg_get_data(peer->xseg, pr->req);
216
217         return do_aio_generic(peer, pr, X_READ, rio->obj_name,
218                         data + req->serviced,
219                         req->size - req->serviced,
220                         req->offset + req->serviced);
221 }
222
223 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
224 {
225         struct xseg_request *req = pr->req;
226         struct rados_io *rio = (struct rados_io *) pr->priv;
227         char *data = xseg_get_data(peer->xseg, pr->req);
228
229         return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
230                         data + req->serviced,
231                         req->size - req->serviced,
232                         req->offset + req->serviced);
233 }
234
235 int handle_delete(struct peerd *peer, struct peer_req *pr)
236 {
237         int r;
238         //struct radosd *rados = (struct radosd *) peer->priv;
239         struct rados_io *rio = (struct rados_io *) pr->priv;
240
241         if (rio->state == ACCEPTED) {
242                 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
243                 rio->state = PENDING;
244                 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
245                 if (r < 0) {
246                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
247                         fail(peer, pr);
248                 }
249         }
250         else {
251                 if (pr->retval < 0){
252                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
253                         fail(peer, pr);
254                 }
255                 else {
256                         XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
257                         complete(peer, pr);
258                 }
259         }
260         return 0;
261 }
262
263 int handle_info(struct peerd *peer, struct peer_req *pr)
264 {
265         int r;
266         struct xseg_request *req = pr->req;
267         //struct radosd *rados = (struct radosd *) peer->priv;
268         struct rados_io *rio = (struct rados_io *) pr->priv;
269         char *req_data = xseg_get_data(peer->xseg, req);
270         struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
271
272         if (rio->state == ACCEPTED) {
273                 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
274                 rio->state = PENDING;
275                 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
276                 if (r < 0) {
277                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
278                         fail(peer, pr);
279                 }
280         }
281         else {
282                 if (pr->retval < 0){
283                         xinfo->size = 0;
284                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
285                         fail(peer, pr);
286                 }
287                 else {
288                         xinfo->size = rio->size;
289                         pr->retval = sizeof(uint64_t);
290                         XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
291                         complete(peer, pr);
292                 }
293         }
294         return 0;
295 }
296
297 int handle_read(struct peerd *peer, struct peer_req *pr)
298 {
299         struct rados_io *rio = (struct rados_io *) (pr->priv);
300         struct xseg_request *req = pr->req;
301         char *data;
302         if (rio->state == ACCEPTED) {
303                 if (!req->size) {
304                         complete(peer, pr);
305                         return 0;
306                 }
307                 rio->state = READING;
308                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
309                 if (do_aio_read(peer, pr) < 0) {
310                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
311                                                 rio->obj_name);
312                         fail(peer, pr);
313                 }
314         }
315         else if (rio->state == READING) {
316                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
317                 data = xseg_get_data(peer->xseg, pr->req);
318                 if (pr->retval > 0)
319                         req->serviced += pr->retval;
320                 else if (pr->retval == 0) {
321                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
322                                 "%llu bytes. Zeroing out rest", rio->obj_name,
323                                 (unsigned long long) req->serviced);
324                         /* reached end of object. zero out rest of data
325                          * requested from this object
326                          */
327                         memset(data + req->serviced, 0, req->datalen - req->serviced);
328                         req->serviced = req->datalen;
329                 }
330                 else if (pr->retval == -2) {
331                         XSEGLOG2(&lc, I, "Reading of %s return -2. "
332                                         "Zeroing out data", rio->obj_name);
333                         /* object not found. return zeros instead */
334                         memset(data, 0, req->datalen);
335                         req->serviced = req->datalen;
336                 }
337                 else {
338                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
339                         /* pr->retval < 0 && pr->retval != -2 */
340                         fail(peer, pr);
341                         return 0;
342                 }
343                 if (req->serviced >= req->datalen) {
344                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
345                         complete(peer, pr);
346                         return 0;
347                 }
348
349                 if (!req->size) {
350                         /* should not happen */
351                         fail(peer, pr);
352                         return 0;
353                 }
354                 /* resubmit */
355                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
356                 if (do_aio_read(peer, pr) < 0) {
357                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
358                                         rio->obj_name);
359                         fail(peer, pr);
360                 }
361         }
362         else {
363                 /* should not reach this */
364                 printf("read request reached this\n");
365                 fail(peer, pr);
366         }
367         return 0;
368 }
369
370 int handle_write(struct peerd *peer, struct peer_req *pr)
371 {
372         struct rados_io *rio = (struct rados_io *) (pr->priv);
373         struct xseg_request *req = pr->req;
374         if (rio->state == ACCEPTED) {
375                 if (!req->size) {
376                         // for future use
377                         if (req->flags & XF_FLUSH) {
378                                 complete(peer, pr);
379                                 return 0;
380                         }
381                         else {
382                                 complete(peer, pr);
383                                 return 0;
384                         }
385                 }
386                 //should we ensure req->op = X_READ ?
387                 rio->state = WRITING;
388                 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
389                 if (do_aio_write(peer, pr) < 0) {
390                         XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
391                                         rio->obj_name);
392                         fail(peer, pr);
393                 }
394         }
395         else if (rio->state == WRITING) {
396                 /* rados writes return 0 if write succeeded or < 0 if failed
397                  * no resubmission occurs
398                  */
399                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
400                 if (pr->retval == 0) {
401                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
402                         req->serviced = req->datalen;
403                         complete(peer, pr);
404                         return 0;
405                 }
406                 else {
407                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
408                         fail(peer, pr);
409                         return 0;
410                 }
411         }
412         else {
413                 /* should not reach this */
414                 printf("write request reached this\n");
415                 fail(peer, pr);
416         }
417         return 0;
418 }
419
420 int handle_copy(struct peerd *peer, struct peer_req *pr)
421 {
422         //struct radosd *rados = (struct radosd *) peer->priv;
423         struct xseg_request *req = pr->req;
424         struct rados_io *rio = (struct rados_io *) pr->priv;
425         int r;
426         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
427
428         if (rio->state == ACCEPTED){
429                 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
430                                 rio->second_name, rio->obj_name);
431                 if (!req->size) {
432                         complete(peer, pr); //or fail?
433                         return 0;
434                 }
435
436                 rio->second_name = malloc(MAX_OBJ_NAME + 1);
437                 if (!rio->second_name){
438                         fail(peer, pr);
439                         return -1;
440                 }
441                 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
442                 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
443                 strncpy(rio->second_name, xcopy->target, end);
444                 rio->second_name[end] = 0;
445
446                 rio->buf = malloc(req->size);
447                 if (!rio->buf) {
448                         r = -1;
449                         goto out_src;
450                 }
451
452                 rio->state = READING;
453                 rio->read = 0;
454                 XSEGLOG2(&lc, I, "Reading %s", rio->second_name);
455                 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
456                         req->size - rio->read, req->offset + rio->read) < 0) {
457                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
458                         fail(peer, pr);
459                         r = -1;
460                         goto out_buf;
461                 }
462         }
463         else if (rio->state == READING){
464                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
465                 if (pr->retval > 0)
466                         rio->read += pr->retval;
467                 else if (pr->retval == 0) {
468                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
469                                 "%llu bytes. Zeroing out rest", rio->obj_name,
470                                 (unsigned long long) req->serviced);
471                         memset(rio->buf + rio->read, 0, req->size - rio->read);
472                         rio->read = req->size ;
473                 }
474                 else {
475                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
476                         r = -1;
477                         goto out_buf;
478                 }
479
480                 if (rio->read >= req->size) {
481                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
482                         //do_aio_write
483                         rio->state = WRITING;
484                         XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
485                         if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
486                                         rio->buf, req->size, req->offset) < 0) {
487                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
488                                 r = -1;
489                                 goto out_buf;
490                         }
491                         return 0;
492                 }
493
494                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
495                 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
496                         req->size - rio->read, req->offset + rio->read) < 0) {
497                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
498                                         rio->obj_name);
499                         r = -1;
500                         goto out_buf;
501                 }
502         }
503         else if (rio->state == WRITING){
504                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
505                 if (pr->retval == 0) {
506                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
507                         XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->second_name, rio->obj_name);
508                         req->serviced = req->size;
509                         r = 0;
510                         goto out_buf;
511                 }
512                 else {
513                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
514                         XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->second_name, rio->obj_name);
515                         r = -1;
516                         goto out_buf;
517                 }
518         }
519         else {
520                 XSEGLOG2(&lc, E, "Unknown state");
521         }
522         return 0;
523
524
525 out_buf:
526         free(rio->buf);
527 out_src:
528         free(rio->second_name);
529
530         rio->buf = NULL;
531         rio->second_name = NULL;
532         rio->read = 0;
533
534         if (r < 0)
535                 fail(peer ,pr);
536         else
537                 complete(peer, pr);
538         return 0;
539 }
540
541 int handle_snapshot(struct peerd *peer, struct peer_req *pr)
542 {
543         //struct radosd *rados = (struct radosd *) peer->priv;
544         struct xseg_request *req = pr->req;
545         struct rados_io *rio = (struct rados_io *) pr->priv;
546         int r;
547
548         if (rio->state == ACCEPTED){
549                 struct xseg_request_snapshot *xsnapshot;
550                 xsnapshot = (struct xseg_request_snapshot *)xseg_get_data(peer->xseg, req);
551                 (void)xsnapshot; //ignore it
552                 XSEGLOG2(&lc, I, "Starting snapshot of object %s", rio->obj_name);
553                 if (!req->size) {
554                         complete(peer, pr); //or fail?
555                         return 0;
556                 }
557
558                 rio->second_name = malloc(MAX_OBJ_NAME+1);
559                 if (!rio->second_name){
560                         return -1;
561                 }
562                 rio->buf = malloc(req->size);
563                 if (!rio->buf) {
564                         r = -1;
565                         goto out_src;
566                 }
567                 rio->state = READING;
568                 rio->read = 0;
569                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
570                 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
571                         req->size - rio->read, req->offset + rio->read) < 0) {
572                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
573                         fail(peer, pr);
574                         r = -1;
575                         goto out_buf;
576                 }
577         }
578         else if (rio->state == READING){
579                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
580                 if (pr->retval >= 0)
581                         rio->read += pr->retval;
582                 else {
583                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
584                         r = -1;
585                         goto out_buf;
586                 }
587
588                 if (!pr->retval || rio->read >= req->size) {
589                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
590                         //rstrip here in case zeros were written in the end
591                         uint64_t trailing_zeros = 0;
592                         for (;trailing_zeros < rio->read; trailing_zeros++)
593                                 if (rio->buf[rio->read-trailing_zeros -1])
594                                         break;
595                         XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
596                                         rio->read, trailing_zeros);
597
598                         rio->read -= trailing_zeros;
599                         //calculate snapshot name
600                         unsigned char sha[SHA256_DIGEST_SIZE];
601                         SHA256((unsigned char *) rio->buf, rio->read, sha);
602                         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
603                                         sizeof(struct xseg_reply_snapshot));
604
605                         struct xseg_reply_snapshot *xreply;
606                         xreply = (struct xseg_reply_snapshot *)xseg_get_data(peer->xseg, req);
607                         hexlify(sha, xreply->target);
608                         xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
609
610                         strncpy(rio->second_name, xreply->target, xreply->targetlen);
611                         rio->second_name[xreply->targetlen] = 0;
612                         XSEGLOG2(&lc, I, "Calculated %s as snapshot of %s",
613                                         rio->second_name, rio->obj_name);
614
615                         //aio_stat
616                         rio->state = STATING;
617                         r = do_aio_generic(peer, pr, X_INFO, rio->second_name, NULL, 0, 0);
618                         if (r < 0){
619                                 XSEGLOG2(&lc, E, "Stating %s failed", rio->second_name);
620                                 r = -1;
621                                 goto out_buf;
622                         }
623                         return 0;
624                 }
625                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
626                 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
627                         req->size - rio->read, req->offset + rio->read) < 0) {
628                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
629                                         rio->obj_name);
630                         r = -1;
631                         goto out_buf;
632                 }
633                 return 0;
634         } else if (rio->state == STATING){
635                 if (pr->retval < 0){
636                         //write
637                         XSEGLOG2(&lc, I, "Stating %s failed. Writing.",
638                                                         rio->second_name);
639                         rio->state = WRITING;
640                         if (do_aio_generic(peer, pr, X_WRITE, rio->second_name,
641                                                 rio->buf, rio->read, 0) < 0) {
642                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->second_name);
643                                 r = -1;
644                                 goto out_buf;
645                         }
646                         return 0;
647                 }
648                 else {
649                         XSEGLOG2(&lc, I, "Stating %s completed Successfully."
650                                         "No need to write.", rio->second_name);
651                         XSEGLOG2(&lc, I, "Snapshot of object %s to object %s completed",
652                                         rio->obj_name, rio->second_name);
653                         req->serviced = req->size;
654                         r = 0;
655                         goto out_buf;
656                 }
657
658         }
659         else if (rio->state == WRITING){
660                 struct xseg_reply_snapshot *xreply;
661                 xreply = (struct xseg_reply_snapshot *)xseg_get_data(peer->xseg, req);
662                 (void)xreply;
663                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
664                 if (pr->retval == 0) {
665                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->second_name);
666                         XSEGLOG2(&lc, I, "Snapshot of object %s to object %s completed",
667                                         rio->obj_name, rio->second_name);
668                         req->serviced = req->size;
669                         r = 0;
670                         goto out_buf;
671                 }
672                 else {
673                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
674                         XSEGLOG2(&lc, E, "Snapshot of object %s failed",
675                                                                 rio->obj_name);
676                         r = -1;
677                         goto out_buf;
678                 }
679         }
680         else {
681                 XSEGLOG2(&lc, E, "Unknown state");
682         }
683         return 0;
684
685
686 out_buf:
687         free(rio->buf);
688 out_src:
689         free(rio->second_name);
690
691         rio->buf = NULL;
692         rio->second_name = NULL;
693         rio->read = 0;
694
695         if (r < 0)
696                 fail(peer ,pr);
697         else
698                 complete(peer, pr);
699         return 0;
700 }
701
702 int spawnthread(struct peerd *peer, struct peer_req *pr,
703                         void *(*func)(void *arg))
704 {
705         //struct radosd *rados = (struct radosd *) peer->priv;
706         struct rados_io *rio = (struct rados_io *) (pr->priv);
707
708         pthread_attr_t attr;
709         pthread_attr_init(&attr);
710         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
711
712         return (pthread_create(&rio->tid, &attr, func, (void *) pr));
713 }
714
715 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
716 {
717         //assert pr valid
718         struct peer_req *pr = (struct peer_req *)arg;
719         //struct radosd *rados = (struct radosd *) pr->peer->priv;
720         struct rados_io *rio = (struct rados_io *) (pr->priv);
721
722         if (pr->req->op == X_ACQUIRE){
723                 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
724                 pthread_cond_signal(&rio->cond);
725         }
726         else
727                 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
728 }
729
730 void * lock_op(void *arg)
731 {
732         struct peer_req *pr = (struct peer_req *)arg;
733         struct radosd *rados = (struct radosd *) pr->peer->priv;
734         struct rados_io *rio = (struct rados_io *) (pr->priv);
735         uint32_t len = strlen(rio->obj_name);
736         strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
737         rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
738
739         XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
740         if (!(pr->req->flags & XF_NOSYNC)){
741                 if (rados_watch(rados->ioctx, rio->obj_name, 0,
742                                 &rio->watch_handle, watch_cb, pr) < 0){
743                         XSEGLOG2(&lc, E, "Rados watch failed for %s",
744                                         rio->obj_name);
745                         fail(pr->peer, pr);
746                         return NULL;
747                 }
748         }
749
750         /* passing flag 1 means renew lock */
751         while(rados_lock_exclusive(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
752                 RADOS_LOCK_COOKIE, RADOS_LOCK_DESC, NULL, LIBRADOS_LOCK_FLAG_RENEW) < 0){
753                 if (pr->req->flags & XF_NOSYNC){
754                         XSEGLOG2(&lc, E, "Rados lock failed for %s",
755                                         rio->obj_name);
756                         fail(pr->peer, pr);
757                         return NULL;
758                 }
759                 else{
760                         XSEGLOG2(&lc, D, "rados lock for %s sleeping",
761                                         rio->obj_name);
762                         pthread_mutex_lock(&rio->m);
763                         pthread_cond_wait(&rio->cond, &rio->m);
764                         pthread_mutex_unlock(&rio->m);
765                         XSEGLOG2(&lc, D, "rados lock for %s woke up",
766                                         rio->obj_name);
767                 }
768         }
769         if (!(pr->req->flags & XF_NOSYNC)){
770                 if (rados_unwatch(rados->ioctx, rio->obj_name,
771                                         rio->watch_handle) < 0){
772                         XSEGLOG2(&lc, E, "Rados unwatch failed");
773                 }
774         }
775         XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
776         complete(pr->peer, pr);
777         return NULL;
778 }
779
780 int break_lock(struct radosd *rados, struct rados_io *rio)
781 {
782         int r, exclusive;
783         char *tag = NULL, *clients = NULL, *cookies = NULL, *addrs = NULL;
784         size_t tag_len = 1024, clients_len = 1024, cookies_len = 1024;
785         size_t addrs_len = 1024;
786         ssize_t nr_lockers;
787
788         for (;;) {
789                 tag = malloc(sizeof(char) * tag_len);
790                 clients = malloc(sizeof(char) * clients_len);
791                 cookies = malloc(sizeof(char) * cookies_len);
792                 addrs = malloc(sizeof(char) * addrs_len);
793                 if (!tag || !clients || !cookies || !addrs) {
794                         XSEGLOG2(&lc, E, "Out of memmory");
795                         r = -1;
796                         break;
797                 }
798
799                 nr_lockers = rados_list_lockers(rados->ioctx, rio->obj_name,
800                                 RADOS_LOCK_NAME, &exclusive, tag, &tag_len,
801                                 clients, &clients_len, cookies, &cookies_len,
802                                 addrs, &addrs_len);
803                 if (nr_lockers < 0 && nr_lockers != -ERANGE) {
804                         XSEGLOG2(&lc, E, "Could not list lockers for %s", rio->obj_name);
805                         r = -1;
806                         break;
807                 } else if (nr_lockers == -ERANGE) {
808                         free(tag);
809                         free(clients);
810                         free(cookies);
811                         free(addrs);
812                 } else {
813                         if (nr_lockers != 1) {
814                                 XSEGLOG2(&lc, E, "Number of lockers for %s != 1 !(%d)",
815                                                 rio->obj_name, nr_lockers);
816                                 r = -1;
817                                 break;
818                         } else if (!exclusive) {
819                                 XSEGLOG2(&lc, E, "Lock for %s is not exclusive",
820                                                 rio->obj_name);
821                                 r = -1;
822                                 break;
823                         } else if (strcmp(RADOS_LOCK_TAG, tag)) {
824                                 XSEGLOG2(&lc, E, "List lockers returned wrong tag "
825                                                 "(\"%s\" vs \"%s\")", tag, RADOS_LOCK_TAG);
826                                 r = -1;
827                                 break;
828                         }
829                         r = rados_break_lock(rados->ioctx, rio->obj_name,
830                                 RADOS_LOCK_NAME, clients, RADOS_LOCK_COOKIE);
831                         break;
832                 }
833         }
834
835         free(tag);
836         free(clients);
837         free(cookies);
838         free(addrs);
839
840         return r;
841 }
842
843 void * unlock_op(void *arg)
844 {
845         struct peer_req *pr = (struct peer_req *)arg;
846         struct radosd *rados = (struct radosd *) pr->peer->priv;
847         struct rados_io *rio = (struct rados_io *) (pr->priv);
848         uint32_t len = strlen(rio->obj_name);
849         strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
850         rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
851         int r;
852
853         XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
854         if (pr->req->flags & XF_FORCE) {
855                 r = break_lock(rados, rio);
856         }
857         else {
858                 r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
859                         RADOS_LOCK_COOKIE);
860         }
861         /* ENOENT means that the lock did not existed.
862          * This still counts as a successfull unlock operation
863          */
864         //if (r < 0 && r != -ENOENT){
865         if (r < 0){
866                 XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
867                 fail(pr->peer, pr);
868         }
869         else {
870                 if (rados_notify(rados->ioctx, rio->obj_name, 
871                                         0, NULL, 0) < 0) {
872                         XSEGLOG2(&lc, E, "rados notify failed");
873                 }
874                 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
875                 complete(pr->peer, pr);
876         }
877         return NULL;
878 }
879
880 int handle_acquire(struct peerd *peer, struct peer_req *pr)
881 {
882         int r = spawnthread(peer, pr, lock_op);
883         if (r < 0)
884                 fail(pr->peer, pr);
885         return 0;
886 }
887
888
889 int handle_release(struct peerd *peer, struct peer_req *pr)
890 {
891         int r = spawnthread(peer, pr, unlock_op);
892         if (r < 0)
893                 fail(pr->peer, pr);
894         return 0;
895 }
896
897 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
898 {
899         int i, j;
900         struct radosd *rados = malloc(sizeof(struct radosd));
901         struct rados_io *rio;
902         if (!rados) {
903                 perror("malloc");
904                 return -1;
905         }
906         rados->pool[0] = 0;
907
908         BEGIN_READ_ARGS(argc, argv);
909         READ_ARG_STRING("--pool", rados->pool, MAX_POOL_NAME);
910         END_READ_ARGS();
911
912         if (!rados->pool[0]){
913                 XSEGLOG2(&lc, E , "Pool must be provided");
914                 free(rados);
915                 usage(argv[0]);
916                 return -1;
917         }
918
919         if (rados_create(&rados->cluster, NULL) < 0) {
920                 XSEGLOG2(&lc, E, "Rados create failed!");
921                 return -1;
922         }
923         if (rados_conf_read_file(rados->cluster, NULL) < 0){
924                 XSEGLOG2(&lc, E, "Error reading rados conf files!");
925                 return -1;
926         }
927         if (rados_connect(rados->cluster) < 0) {
928                 XSEGLOG2(&lc, E, "Rados connect failed!");
929                 rados_shutdown(rados->cluster);
930                 free(rados);
931                 return 0;
932         }
933         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
934                 XSEGLOG2(&lc, E, "Pool does not exists. Try creating it first");
935                 rados_shutdown(rados->cluster);
936                 free(rados);
937                 return -1;
938                 /*
939                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
940                         XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
941                         rados_shutdown(rados->cluster);
942                         free(rados);
943                         return -1;
944                 }
945                 XSEGLOG2(&lc, I, "Pool created.");
946                 */
947
948         }
949         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
950                 XSEGLOG2(&lc, E, "ioctx create problem.");
951                 rados_shutdown(rados->cluster);
952                 free(rados);
953                 return -1;
954         }
955         peer->priv = (void *) rados;
956         for (i = 0; i < peer->nr_ops; i++) {
957                 rio = malloc(sizeof(struct rados_io));
958                 if (!rio) {
959                         //ugly
960                         //is this really necessary?
961                         for (j = 0; j < i; j++) {
962                                 free(peer->peer_reqs[j].priv);
963                         }
964                         free(rados);
965                         perror("malloc");
966                         return -1;
967                 }
968                 rio->buf = 0;
969                 rio->read = 0;
970                 rio->size = 0;
971                 rio->second_name = 0;
972                 rio->watch_handle = 0;
973                 pthread_cond_init(&rio->cond, NULL);
974                 pthread_mutex_init(&rio->m, NULL);
975                 peer->peer_reqs[i].priv = (void *) rio;
976         }
977         return 0;
978 }
979
980 // nothing to do here for now
981 int custom_arg_parse(int argc, const char *argv[])
982 {
983         return 0;
984 }
985
986 void custom_peer_finalize(struct peerd *peer)
987 {
988         return;
989 }
990
991 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
992                 enum dispatch_reason reason)
993 {
994         struct rados_io *rio = (struct rados_io *) (pr->priv);
995         char *target = xseg_get_target(peer->xseg, pr->req);
996         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
997         strncpy(rio->obj_name, target, end);
998         rio->obj_name[end] = 0;
999         //log_pr("dispatch", pr);
1000         if (reason == dispatch_accept)
1001                 rio->state = ACCEPTED;
1002
1003         switch (pr->req->op){
1004                 case X_READ:
1005                         handle_read(peer, pr); break;
1006                 case X_WRITE: 
1007                         handle_write(peer, pr); break;
1008                 case X_DELETE:
1009                         if (canDefer(peer))
1010                                 defer_request(peer, pr);
1011                         else
1012                                 handle_delete(peer, pr);
1013                         break;
1014                 case X_INFO:
1015                         if (canDefer(peer))
1016                                 defer_request(peer, pr);
1017                         else
1018                                 handle_info(peer, pr);
1019                         break;
1020                 case X_COPY:
1021                         if (canDefer(peer))
1022                                 defer_request(peer, pr);
1023                         else
1024                                 handle_copy(peer, pr);
1025                         break;
1026                 case X_ACQUIRE:
1027                         handle_acquire(peer, pr); break;
1028                 case X_RELEASE:
1029                         handle_release(peer, pr); break;
1030                 case X_SNAPSHOT:
1031                         handle_snapshot(peer, pr); break;
1032
1033                 default:
1034                         fail(peer, pr);
1035         }
1036         return 0;
1037 }