b829ccf215ab370c67e0a9388146c81d5ce6ec8c
[archipelago] / xseg / peers / user / mt-sosd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <xseg/xseg.h>
39 #include <peer.h>
40 #include <rados/librados.h>
41 #include <xseg/protocol.h>
42 #include <pthread.h>
43 #include <openssl/sha.h>
44
45 #ifndef SHA256_DIGEST_SIZE
46 #define SHA256_DIGEST_SIZE 32
47 #endif
48 /* hex representation of sha256 value takes up double the sha256 size */
49 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
50
51
52 #define LOCK_SUFFIX "_lock"
53 #define LOCK_SUFFIX_LEN 5
54
55 #define MAX_POOL_NAME 64
56 #define MAX_OBJ_NAME (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + 1)
57 #define RADOS_LOCK_NAME "RadosLock"
58 //#define RADOS_LOCK_COOKIE "Cookie"
59 #define RADOS_LOCK_COOKIE "foo"
60
61 void custom_peer_usage()
62 {
63         fprintf(stderr, "Custom peer options:\n"
64                 "--pool: Rados pool to connect\n"
65                 "\n");
66 }
67
68 /* hexlify function. 
69  * Unsafe. Doesn't check if data length is odd!
70  */
71 static void hexlify(unsigned char *data, char *hex)
72 {
73         int i;
74         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
75                 sprintf(hex+2*i, "%02x", data[i]);
76 }
77
78 static void unhexlify(char *hex, unsigned char *data)
79 {
80         int i;
81         char c;
82         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
83                 data[i] = 0;
84                 c = hex[2*i];
85                 if (isxdigit(c)){
86                         if (isdigit(c)){
87                                 c-= '0';
88                         }
89                         else {
90                                 c = tolower(c);
91                                 c = c-'a' + 10;
92                         }
93                 }
94                 else {
95                         c = 0;
96                 }
97                 data[i] |= (c << 4) & 0xF0;
98                 c = hex[2*i+1];
99                 if (isxdigit(c)){
100                         if (isdigit(c)){
101                                 c-= '0';
102                         }
103                         else {
104                                 c = tolower(c);
105                                 c = c-'a' + 10;
106                         }
107                 }
108                 else {
109                         c = 0;
110                 }
111                 data[i] |= c & 0x0F;
112         }
113 }
114
115
116 enum rados_state {
117         ACCEPTED = 0,
118         PENDING = 1,
119         READING = 2,
120         WRITING = 3,
121         STATING = 4
122 };
123
124 struct radosd {
125         rados_t cluster;
126         rados_ioctx_t ioctx;
127         char pool[MAX_POOL_NAME + 1];
128 };
129
130 struct rados_io{
131         char obj_name[MAX_OBJ_NAME + 1];
132         enum rados_state state;
133         uint64_t size;
134         char *second_name, *buf;
135         uint64_t read;
136         uint64_t watch_handle;
137         pthread_t tid;
138         pthread_cond_t cond;
139         pthread_mutex_t m;
140 };
141
142 void rados_ack_cb(rados_completion_t c, void *arg)
143 {
144         struct peer_req *pr = (struct peer_req*) arg;
145         struct peerd *peer = pr->peer;
146         int ret = rados_aio_get_return_value(c);
147         pr->retval = ret;
148         rados_aio_release(c);
149         dispatch(peer, pr, pr->req, dispatch_internal);
150 }
151
152 void rados_commit_cb(rados_completion_t c, void *arg)
153 {
154         struct peer_req *pr = (struct peer_req*) arg;
155         struct peerd *peer = pr->peer;
156         int ret = rados_aio_get_return_value(c);
157         pr->retval = ret;
158         rados_aio_release(c);
159         dispatch(peer, pr, pr->req, dispatch_internal);
160 }
161
162 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
163                 char *target, char *buf, uint64_t size, uint64_t offset)
164 {
165         struct radosd *rados = (struct radosd *) peer->priv;
166         struct rados_io *rio = (struct rados_io *) pr->priv;
167         int r;
168
169         rados_completion_t rados_compl;
170         switch (op) {
171                 case X_READ:
172                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
173                         if (r < 0)
174                                 return -1;
175                         r = rados_aio_read(rados->ioctx, target, rados_compl,
176                                         buf, size, offset);
177                         break;
178                 case X_WRITE:
179                         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
180                         if (r < 0)
181                                 return -1;
182                         r = rados_aio_write(rados->ioctx, target, rados_compl,
183                                         buf, size, offset);
184                         break;
185                 case X_DELETE:
186                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
187                         if (r < 0)
188                                 return -1;
189                         r = rados_aio_remove(rados->ioctx, target, rados_compl);
190                         break;
191                 case X_INFO:
192                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
193                         if (r < 0)
194                                 return -1;
195                         r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
196                         break;
197                 default:
198                         return -1;
199                         break;
200         }
201         if (r < 0) {
202                 rados_aio_release(rados_compl);
203         }
204         return r;
205 }
206
207 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
208 {
209         struct xseg_request *req = pr->req;
210         struct rados_io *rio = (struct rados_io *) pr->priv;
211         char *data = xseg_get_data(peer->xseg, pr->req);
212
213         return do_aio_generic(peer, pr, X_READ, rio->obj_name,
214                         data + req->serviced,
215                         req->size - req->serviced,
216                         req->offset + req->serviced);
217 }
218
219 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
220 {
221         struct xseg_request *req = pr->req;
222         struct rados_io *rio = (struct rados_io *) pr->priv;
223         char *data = xseg_get_data(peer->xseg, pr->req);
224
225         return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
226                         data + req->serviced,
227                         req->size - req->serviced,
228                         req->offset + req->serviced);
229 }
230
231 int handle_delete(struct peerd *peer, struct peer_req *pr)
232 {
233         int r;
234         //struct radosd *rados = (struct radosd *) peer->priv;
235         struct rados_io *rio = (struct rados_io *) pr->priv;
236
237         if (rio->state == ACCEPTED) {
238                 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
239                 rio->state = PENDING;
240                 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
241                 if (r < 0) {
242                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
243                         fail(peer, pr);
244                 }
245         }
246         else {
247                 if (pr->retval < 0){
248                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
249                         fail(peer, pr);
250                 }
251                 else {
252                         XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
253                         complete(peer, pr);
254                 }
255         }
256         return 0;
257 }
258
259 int handle_info(struct peerd *peer, struct peer_req *pr)
260 {
261         int r;
262         struct xseg_request *req = pr->req;
263         //struct radosd *rados = (struct radosd *) peer->priv;
264         struct rados_io *rio = (struct rados_io *) pr->priv;
265         char *req_data = xseg_get_data(peer->xseg, req);
266         struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
267
268         if (rio->state == ACCEPTED) {
269                 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
270                 rio->state = PENDING;
271                 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
272                 if (r < 0) {
273                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
274                         fail(peer, pr);
275                 }
276         }
277         else {
278                 if (pr->retval < 0){
279                         xinfo->size = 0;
280                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
281                         fail(peer, pr);
282                 }
283                 else {
284                         xinfo->size = rio->size;
285                         pr->retval = sizeof(uint64_t);
286                         XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
287                         complete(peer, pr);
288                 }
289         }
290         return 0;
291 }
292
293 int handle_read(struct peerd *peer, struct peer_req *pr)
294 {
295         struct rados_io *rio = (struct rados_io *) (pr->priv);
296         struct xseg_request *req = pr->req;
297         char *data;
298         if (rio->state == ACCEPTED) {
299                 if (!req->size) {
300                         complete(peer, pr);
301                         return 0;
302                 }
303                 rio->state = READING;
304                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
305                 if (do_aio_read(peer, pr) < 0) {
306                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
307                                                 rio->obj_name);
308                         fail(peer, pr);
309                 }
310         }
311         else if (rio->state == READING) {
312                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
313                 data = xseg_get_data(peer->xseg, pr->req);
314                 if (pr->retval > 0)
315                         req->serviced += pr->retval;
316                 else if (pr->retval == 0) {
317                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
318                                 "%llu bytes. Zeroing out rest", rio->obj_name,
319                                 (unsigned long long) req->serviced);
320                         /* reached end of object. zero out rest of data
321                          * requested from this object
322                          */
323                         memset(data + req->serviced, 0, req->datalen - req->serviced);
324                         req->serviced = req->datalen;
325                 }
326                 else if (pr->retval == -2) {
327                         XSEGLOG2(&lc, I, "Reading of %s return -2. "
328                                         "Zeroing out data", rio->obj_name);
329                         /* object not found. return zeros instead */
330                         memset(data, 0, req->datalen);
331                         req->serviced = req->datalen;
332                 }
333                 else {
334                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
335                         /* pr->retval < 0 && pr->retval != -2 */
336                         fail(peer, pr);
337                         return 0;
338                 }
339                 if (req->serviced >= req->datalen) {
340                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
341                         complete(peer, pr);
342                         return 0;
343                 }
344
345                 if (!req->size) {
346                         /* should not happen */
347                         fail(peer, pr);
348                         return 0;
349                 }
350                 /* resubmit */
351                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
352                 if (do_aio_read(peer, pr) < 0) {
353                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
354                                         rio->obj_name);
355                         fail(peer, pr);
356                 }
357         }
358         else {
359                 /* should not reach this */
360                 printf("read request reached this\n");
361                 fail(peer, pr);
362         }
363         return 0;
364 }
365
366 int handle_write(struct peerd *peer, struct peer_req *pr)
367 {
368         struct rados_io *rio = (struct rados_io *) (pr->priv);
369         struct xseg_request *req = pr->req;
370         if (rio->state == ACCEPTED) {
371                 if (!req->size) {
372                         // for future use
373                         if (req->flags & XF_FLUSH) {
374                                 complete(peer, pr);
375                                 return 0;
376                         }
377                         else {
378                                 complete(peer, pr);
379                                 return 0;
380                         }
381                 }
382                 //should we ensure req->op = X_READ ?
383                 rio->state = WRITING;
384                 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
385                 if (do_aio_write(peer, pr) < 0) {
386                         XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
387                                         rio->obj_name);
388                         fail(peer, pr);
389                 }
390         }
391         else if (rio->state == WRITING) {
392                 /* rados writes return 0 if write succeeded or < 0 if failed
393                  * no resubmission occurs
394                  */
395                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
396                 if (pr->retval == 0) {
397                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
398                         req->serviced = req->datalen;
399                         complete(peer, pr);
400                         return 0;
401                 }
402                 else {
403                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
404                         fail(peer, pr);
405                         return 0;
406                 }
407         }
408         else {
409                 /* should not reach this */
410                 printf("write request reached this\n");
411                 fail(peer, pr);
412         }
413         return 0;
414 }
415
416 int handle_copy(struct peerd *peer, struct peer_req *pr)
417 {
418         //struct radosd *rados = (struct radosd *) peer->priv;
419         struct xseg_request *req = pr->req;
420         struct rados_io *rio = (struct rados_io *) pr->priv;
421         int r;
422         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
423
424         if (rio->state == ACCEPTED){
425                 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
426                                 rio->second_name, rio->obj_name);
427                 if (!req->size) {
428                         complete(peer, pr); //or fail?
429                         return 0;
430                 }
431
432                 rio->second_name = malloc(MAX_OBJ_NAME + 1);
433                 if (!rio->second_name){
434                         fail(peer, pr);
435                         return -1;
436                 }
437                 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
438                 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
439                 strncpy(rio->second_name, xcopy->target, end);
440                 rio->second_name[end] = 0;
441
442                 rio->buf = malloc(req->size);
443                 if (!rio->buf) {
444                         r = -1;
445                         goto out_src;
446                 }
447
448                 rio->state = READING;
449                 rio->read = 0;
450                 XSEGLOG2(&lc, I, "Reading %s", rio->second_name);
451                 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
452                         req->size - rio->read, req->offset + rio->read) < 0) {
453                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
454                         fail(peer, pr);
455                         r = -1;
456                         goto out_buf;
457                 }
458         }
459         else if (rio->state == READING){
460                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
461                 if (pr->retval > 0)
462                         rio->read += pr->retval;
463                 else if (pr->retval == 0) {
464                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
465                                 "%llu bytes. Zeroing out rest", rio->obj_name,
466                                 (unsigned long long) req->serviced);
467                         memset(rio->buf + rio->read, 0, req->size - rio->read);
468                         rio->read = req->size ;
469                 }
470                 else {
471                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
472                         r = -1;
473                         goto out_buf;
474                 }
475
476                 if (rio->read >= req->size) {
477                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
478                         //do_aio_write
479                         rio->state = WRITING;
480                         XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
481                         if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
482                                         rio->buf, req->size, req->offset) < 0) {
483                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
484                                 r = -1;
485                                 goto out_buf;
486                         }
487                         return 0;
488                 }
489
490                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
491                 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
492                         req->size - rio->read, req->offset + rio->read) < 0) {
493                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
494                                         rio->obj_name);
495                         r = -1;
496                         goto out_buf;
497                 }
498         }
499         else if (rio->state == WRITING){
500                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
501                 if (pr->retval == 0) {
502                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
503                         XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->second_name, rio->obj_name);
504                         req->serviced = req->size;
505                         r = 0;
506                         goto out_buf;
507                 }
508                 else {
509                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
510                         XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->second_name, rio->obj_name);
511                         r = -1;
512                         goto out_buf;
513                 }
514         }
515         else {
516                 XSEGLOG2(&lc, E, "Unknown state");
517         }
518         return 0;
519
520
521 out_buf:
522         free(rio->buf);
523 out_src:
524         free(rio->second_name);
525
526         rio->buf = NULL;
527         rio->second_name = NULL;
528         rio->read = 0;
529
530         if (r < 0)
531                 fail(peer ,pr);
532         else
533                 complete(peer, pr);
534         return 0;
535 }
536
537 int handle_snapshot(struct peerd *peer, struct peer_req *pr)
538 {
539         //struct radosd *rados = (struct radosd *) peer->priv;
540         struct xseg_request *req = pr->req;
541         struct rados_io *rio = (struct rados_io *) pr->priv;
542         int r;
543
544         if (rio->state == ACCEPTED){
545                 struct xseg_request_snapshot *xsnapshot;
546                 xsnapshot = (struct xseg_request_snapshot *)xseg_get_data(peer->xseg, req);
547                 (void)xsnapshot; //ignore it
548                 XSEGLOG2(&lc, I, "Starting snapshot of object %s", rio->obj_name);
549                 if (!req->size) {
550                         complete(peer, pr); //or fail?
551                         return 0;
552                 }
553
554                 rio->second_name = malloc(MAX_OBJ_NAME+1);
555                 if (!rio->second_name){
556                         return -1;
557                 }
558                 rio->buf = malloc(req->size);
559                 if (!rio->buf) {
560                         r = -1;
561                         goto out_src;
562                 }
563                 rio->state = READING;
564                 rio->read = 0;
565                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
566                 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
567                         req->size - rio->read, req->offset + rio->read) < 0) {
568                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
569                         fail(peer, pr);
570                         r = -1;
571                         goto out_buf;
572                 }
573         }
574         else if (rio->state == READING){
575                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
576                 if (pr->retval >= 0)
577                         rio->read += pr->retval;
578                 else {
579                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
580                         r = -1;
581                         goto out_buf;
582                 }
583
584                 if (!pr->retval || rio->read >= req->size) {
585                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
586                         //rstrip here in case zeros were written in the end
587                         uint64_t trailing_zeros = 0;
588                         for (;trailing_zeros < rio->read; trailing_zeros++)
589                                 if (rio->buf[rio->read-trailing_zeros -1])
590                                         break;
591                         XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
592                                         rio->read, trailing_zeros);
593
594                         rio->read -= trailing_zeros;
595                         //calculate snapshot name
596                         unsigned char sha[SHA256_DIGEST_SIZE];
597                         SHA256((unsigned char *) rio->buf, rio->read, sha);
598                         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
599                                         sizeof(struct xseg_reply_snapshot));
600
601                         struct xseg_reply_snapshot *xreply;
602                         xreply = (struct xseg_reply_snapshot *)xseg_get_data(peer->xseg, req);
603                         hexlify(sha, xreply->target);
604                         xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
605
606                         strncpy(rio->second_name, xreply->target, xreply->targetlen);
607                         rio->second_name[xreply->targetlen] = 0;
608                         XSEGLOG2(&lc, I, "Calculated %s as snapshot of %s",
609                                         rio->second_name, rio->obj_name);
610
611                         //aio_stat
612                         rio->state = STATING;
613                         r = do_aio_generic(peer, pr, X_INFO, rio->second_name, NULL, 0, 0);
614                         if (r < 0){
615                                 XSEGLOG2(&lc, E, "Stating %s failed", rio->second_name_name);
616                                 r = -1;
617                                 goto out_buf;
618                         }
619                         return 0;
620                 }
621                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
622                 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
623                         req->size - rio->read, req->offset + rio->read) < 0) {
624                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
625                                         rio->obj_name);
626                         r = -1;
627                         goto out_buf;
628                 }
629                 return 0;
630         } else if (rio->state == STATING){
631                 if (pr->retval < 0){
632                         //write
633                         XSEGLOG2(&lc, I, "Stating %s failed. Writing.",
634                                                         rio->second_name);
635                         rio->state = WRITING;
636                         if (do_aio_generic(peer, pr, X_WRITE, rio->second_name,
637                                                 rio->buf, rio->read, 0) < 0) {
638                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->second_name);
639                                 r = -1;
640                                 goto out_buf;
641                         }
642                         return 0;
643                 }
644                 else {
645                         XSEGLOG2(&lc, I, "Stating %s completed Successfully."
646                                         "No need to write.", rio->second_name);
647                         r = 0;
648                         goto out_buf;
649                 }
650
651         }
652         else if (rio->state == WRITING){
653                 struct xseg_reply_snapshot *xreply;
654                 xreply = (struct xseg_reply_snapshot *)xseg_get_data(peer->xseg, req);
655                 (void)xreply;
656                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
657                 if (pr->retval == 0) {
658                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->second_name);
659                         XSEGLOG2(&lc, I, "Snapshot of object %s to object %s completed",
660                                         rio->obj_name, rio->second_name);
661                         req->serviced = req->size;
662                         r = 0;
663                         goto out_buf;
664                 }
665                 else {
666                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
667                         XSEGLOG2(&lc, E, "Snapshot of object %s failed",
668                                                                 rio->obj_name);
669                         r = -1;
670                         goto out_buf;
671                 }
672         }
673         else {
674                 XSEGLOG2(&lc, E, "Unknown state");
675         }
676         return 0;
677
678
679 out_buf:
680         free(rio->buf);
681 out_src:
682         free(rio->second_name);
683
684         rio->buf = NULL;
685         rio->second_name = NULL;
686         rio->read = 0;
687
688         if (r < 0)
689                 fail(peer ,pr);
690         else
691                 complete(peer, pr);
692         return 0;
693 }
694
695 int spawnthread(struct peerd *peer, struct peer_req *pr,
696                         void *(*func)(void *arg))
697 {
698         //struct radosd *rados = (struct radosd *) peer->priv;
699         struct rados_io *rio = (struct rados_io *) (pr->priv);
700
701         pthread_attr_t attr;
702         pthread_attr_init(&attr);
703         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
704
705         return (pthread_create(&rio->tid, &attr, func, (void *) pr));
706 }
707
708 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
709 {
710         //assert pr valid
711         struct peer_req *pr = (struct peer_req *)arg;
712         //struct radosd *rados = (struct radosd *) pr->peer->priv;
713         struct rados_io *rio = (struct rados_io *) (pr->priv);
714
715         if (pr->req->op == X_ACQUIRE){
716                 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
717                 pthread_cond_signal(&rio->cond);
718         }
719         else
720                 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
721 }
722
723 void * lock_op(void *arg)
724 {
725         struct peer_req *pr = (struct peer_req *)arg;
726         struct radosd *rados = (struct radosd *) pr->peer->priv;
727         struct rados_io *rio = (struct rados_io *) (pr->priv);
728         uint32_t len = strlen(rio->obj_name);
729         strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
730         rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
731
732         XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
733         if (!(pr->req->flags & XF_NOSYNC)){
734                 if (rados_watch(rados->ioctx, rio->obj_name, 0,
735                                 &rio->watch_handle, watch_cb, pr) < 0){
736                         XSEGLOG2(&lc, E, "Rados watch failed for %s",
737                                         rio->obj_name);
738                         fail(pr->peer, pr);
739                         return NULL;
740                 }
741         }
742
743         while(rados_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
744                 C_LOCK_EXCLUSIVE, RADOS_LOCK_COOKIE, "", "", NULL, 0) < 0){
745                 if (pr->req->flags & XF_NOSYNC){
746                         XSEGLOG2(&lc, E, "Rados lock failed for %s",
747                                         rio->obj_name);
748                         fail(pr->peer, pr);
749                         return NULL;
750                 }
751                 else{
752                         XSEGLOG2(&lc, D, "rados lock for %s sleeping",
753                                         rio->obj_name);
754                         pthread_mutex_lock(&rio->m);
755                         pthread_cond_wait(&rio->cond, &rio->m);
756                         pthread_mutex_unlock(&rio->m);
757                         XSEGLOG2(&lc, D, "rados lock for %s woke up",
758                                         rio->obj_name);
759                 }
760         }
761         if (!(pr->req->flags & XF_NOSYNC)){
762                 if (rados_unwatch(rados->ioctx, rio->obj_name,
763                                         rio->watch_handle) < 0){
764                         XSEGLOG2(&lc, E, "Rados unwatch failed");
765                 }
766         }
767         XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
768         complete(pr->peer, pr);
769         return NULL;
770 }
771
772 void * unlock_op(void *arg)
773 {
774         struct peer_req *pr = (struct peer_req *)arg;
775         struct radosd *rados = (struct radosd *) pr->peer->priv;
776         struct rados_io *rio = (struct rados_io *) (pr->priv);
777         uint32_t len = strlen(rio->obj_name);
778         strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
779         rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
780         int r;
781         XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
782         if (pr->req->flags & XF_FORCE)
783                 r = rados_break_lock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
784                         RADOS_LOCK_COOKIE);
785         else
786                 r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
787                         RADOS_LOCK_COOKIE);
788         if (r < 0){
789                 XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
790                 fail(pr->peer, pr);
791         }
792         else {
793                 if (rados_notify(rados->ioctx, rio->obj_name, 
794                                         0, NULL, 0) < 0) {
795                         XSEGLOG2(&lc, E, "rados notify failed");
796                 }
797                 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
798                 complete(pr->peer, pr);
799         }
800         return NULL;
801 }
802
803 int handle_acquire(struct peerd *peer, struct peer_req *pr)
804 {
805         int r = spawnthread(peer, pr, lock_op);
806         if (r < 0)
807                 fail(pr->peer, pr);
808         return 0;
809 }
810
811
812 int handle_release(struct peerd *peer, struct peer_req *pr)
813 {
814         int r = spawnthread(peer, pr, unlock_op);
815         if (r < 0)
816                 fail(pr->peer, pr);
817         return 0;
818 }
819
820 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
821 {
822         int i, j;
823         struct radosd *rados = malloc(sizeof(struct radosd));
824         struct rados_io *rio;
825         if (!rados) {
826                 perror("malloc");
827                 return -1;
828         }
829         rados->pool[0] = 0;
830
831         BEGIN_READ_ARGS(argc, argv);
832         READ_ARG_STRING("--pool", rados->pool, MAX_POOL_NAME);
833         END_READ_ARGS();
834
835         if (!rados->pool[0]){
836                 XSEGLOG2(&lc, E , "Pool must be provided");
837                 free(rados);
838                 usage(argv[0]);
839                 return -1;
840         }
841
842         if (rados_create(&rados->cluster, NULL) < 0) {
843                 XSEGLOG2(&lc, E, "Rados create failed!");
844                 return -1;
845         }
846         if (rados_conf_read_file(rados->cluster, NULL) < 0){
847                 XSEGLOG2(&lc, E, "Error reading rados conf files!");
848                 return -1;
849         }
850         if (rados_connect(rados->cluster) < 0) {
851                 XSEGLOG2(&lc, E, "Rados connect failed!");
852                 rados_shutdown(rados->cluster);
853                 free(rados);
854                 return 0;
855         }
856         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
857                 XSEGLOG2(&lc, E, "Pool does not exists. Try creating it first");
858                 rados_shutdown(rados->cluster);
859                 free(rados);
860                 return -1;
861                 /*
862                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
863                         XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
864                         rados_shutdown(rados->cluster);
865                         free(rados);
866                         return -1;
867                 }
868                 XSEGLOG2(&lc, I, "Pool created.");
869                 */
870
871         }
872         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
873                 XSEGLOG2(&lc, E, "ioctx create problem.");
874                 rados_shutdown(rados->cluster);
875                 free(rados);
876                 return -1;
877         }
878         peer->priv = (void *) rados;
879         for (i = 0; i < peer->nr_ops; i++) {
880                 rio = malloc(sizeof(struct rados_io));
881                 if (!rio) {
882                         //ugly
883                         //is this really necessary?
884                         for (j = 0; j < i; j++) {
885                                 free(peer->peer_reqs[j].priv);
886                         }
887                         free(rados);
888                         perror("malloc");
889                         return -1;
890                 }
891                 rio->buf = 0;
892                 rio->read = 0;
893                 rio->size = 0;
894                 rio->second_name = 0;
895                 rio->watch_handle = 0;
896                 pthread_cond_init(&rio->cond, NULL);
897                 pthread_mutex_init(&rio->m, NULL);
898                 peer->peer_reqs[i].priv = (void *) rio;
899         }
900         return 0;
901 }
902
903 // nothing to do here for now
904 int custom_arg_parse(int argc, const char *argv[])
905 {
906         return 0;
907 }
908
909 void custom_peer_finalize(struct peerd *peer)
910 {
911         return;
912 }
913
914 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
915                 enum dispatch_reason reason)
916 {
917         struct rados_io *rio = (struct rados_io *) (pr->priv);
918         char *target = xseg_get_target(peer->xseg, pr->req);
919         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
920         strncpy(rio->obj_name, target, end);
921         rio->obj_name[end] = 0;
922         //log_pr("dispatch", pr);
923         if (reason == dispatch_accept)
924                 rio->state = ACCEPTED;
925
926         switch (pr->req->op){
927                 case X_READ:
928                         handle_read(peer, pr); break;
929                 case X_WRITE: 
930                         handle_write(peer, pr); break;
931                 case X_DELETE:
932                         if (canDefer(peer))
933                                 defer_request(peer, pr);
934                         else
935                                 handle_delete(peer, pr);
936                         break;
937                 case X_INFO:
938                         if (canDefer(peer))
939                                 defer_request(peer, pr);
940                         else
941                                 handle_info(peer, pr);
942                         break;
943                 case X_COPY:
944                         if (canDefer(peer))
945                                 defer_request(peer, pr);
946                         else
947                                 handle_copy(peer, pr);
948                         break;
949                 case X_ACQUIRE:
950                         handle_acquire(peer, pr); break;
951                 case X_RELEASE:
952                         handle_release(peer, pr); break;
953                 case X_SNAPSHOT:
954                         handle_snapshot(peer, pr); break;
955
956                 default:
957                         fail(peer, pr);
958         }
959         return 0;
960 }