Strip xseg stuff
[archipelago] / xseg / mt-sosd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <xseg/xseg.h>
39 #include <peer.h>
40 #include <rados/librados.h>
41 #include <xseg/protocol.h>
42 #include <pthread.h>
43 #include <openssl/sha.h>
44 #include <ctype.h>
45 #include <errno.h>
46 #include <hash.h>
47
48
49 #define LOCK_SUFFIX "_lock"
50 #define LOCK_SUFFIX_LEN 5
51 #define HASH_SUFFIX "_hash"
52 #define HASH_SUFFIX_LEN 5
53
54 #define MAX_POOL_NAME 64
55 #define MAX_OBJ_NAME (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + 1)
56 #define RADOS_LOCK_NAME "RadosLock"
57 //#define RADOS_LOCK_COOKIE "Cookie"
58 #define RADOS_LOCK_COOKIE "foo"
59 #define RADOS_LOCK_TAG ""
60 #define RADOS_LOCK_DESC ""
61
62 void custom_peer_usage()
63 {
64         fprintf(stderr, "Custom peer options:\n"
65                 "--pool: Rados pool to connect\n"
66                 "\n");
67 }
68
69 enum rados_state {
70         ACCEPTED = 0,
71         PENDING = 1,
72         READING = 2,
73         WRITING = 3,
74         STATING = 4,
75         PREHASHING = 5,
76         POSTHASHING= 6
77 };
78
79 struct radosd {
80         rados_t cluster;
81         rados_ioctx_t ioctx;
82         char pool[MAX_POOL_NAME + 1];
83 };
84
85 struct rados_io{
86         char obj_name[MAX_OBJ_NAME + 1];
87         enum rados_state state;
88         uint64_t size;
89         char *second_name, *buf;
90         uint64_t read;
91         uint64_t watch_handle;
92         pthread_t tid;
93         pthread_cond_t cond;
94         pthread_mutex_t m;
95 };
96
97 void rados_ack_cb(rados_completion_t c, void *arg)
98 {
99         struct peer_req *pr = (struct peer_req*) arg;
100         struct peerd *peer = pr->peer;
101         int ret = rados_aio_get_return_value(c);
102         pr->retval = ret;
103         rados_aio_release(c);
104         dispatch(peer, pr, pr->req, dispatch_internal);
105 }
106
107 void rados_commit_cb(rados_completion_t c, void *arg)
108 {
109         struct peer_req *pr = (struct peer_req*) arg;
110         struct peerd *peer = pr->peer;
111         int ret = rados_aio_get_return_value(c);
112         pr->retval = ret;
113         rados_aio_release(c);
114         dispatch(peer, pr, pr->req, dispatch_internal);
115 }
116
117 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
118                 char *target, char *buf, uint64_t size, uint64_t offset)
119 {
120         struct radosd *rados = (struct radosd *) peer->priv;
121         struct rados_io *rio = (struct rados_io *) pr->priv;
122         int r;
123
124         rados_completion_t rados_compl;
125         switch (op) {
126                 case X_READ:
127                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
128                         if (r < 0)
129                                 return -1;
130                         r = rados_aio_read(rados->ioctx, target, rados_compl,
131                                         buf, size, offset);
132                         break;
133                 case X_WRITE:
134                         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
135                         if (r < 0)
136                                 return -1;
137                         r = rados_aio_write(rados->ioctx, target, rados_compl,
138                                         buf, size, offset);
139                         break;
140                 case X_DELETE:
141                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
142                         if (r < 0)
143                                 return -1;
144                         r = rados_aio_remove(rados->ioctx, target, rados_compl);
145                         break;
146                 case X_INFO:
147                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
148                         if (r < 0)
149                                 return -1;
150                         r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
151                         break;
152                 default:
153                         return -1;
154                         break;
155         }
156         if (r < 0) {
157                 rados_aio_release(rados_compl);
158         }
159         return r;
160 }
161
162 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
163 {
164         struct xseg_request *req = pr->req;
165         struct rados_io *rio = (struct rados_io *) pr->priv;
166         char *data = xseg_get_data(peer->xseg, pr->req);
167
168         return do_aio_generic(peer, pr, X_READ, rio->obj_name,
169                         data + req->serviced,
170                         req->size - req->serviced,
171                         req->offset + req->serviced);
172 }
173
174 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
175 {
176         struct xseg_request *req = pr->req;
177         struct rados_io *rio = (struct rados_io *) pr->priv;
178         char *data = xseg_get_data(peer->xseg, pr->req);
179
180         return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
181                         data + req->serviced,
182                         req->size - req->serviced,
183                         req->offset + req->serviced);
184 }
185
186 int handle_delete(struct peerd *peer, struct peer_req *pr)
187 {
188         int r;
189         //struct radosd *rados = (struct radosd *) peer->priv;
190         struct rados_io *rio = (struct rados_io *) pr->priv;
191
192         if (rio->state == ACCEPTED) {
193                 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
194                 rio->state = PENDING;
195                 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
196                 if (r < 0) {
197                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
198                         fail(peer, pr);
199                 }
200         }
201         else {
202                 if (pr->retval < 0){
203                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
204                         fail(peer, pr);
205                 }
206                 else {
207                         XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
208                         complete(peer, pr);
209                 }
210         }
211         return 0;
212 }
213
214 int handle_info(struct peerd *peer, struct peer_req *pr)
215 {
216         int r;
217         struct xseg_request *req = pr->req;
218         //struct radosd *rados = (struct radosd *) peer->priv;
219         struct rados_io *rio = (struct rados_io *) pr->priv;
220         char *req_data;
221         struct xseg_reply_info *xinfo;
222         char buf[XSEG_MAX_TARGETLEN + 1];
223         char *target;
224
225         if (rio->state == ACCEPTED) {
226                 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
227                 rio->state = PENDING;
228                 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
229                 if (r < 0) {
230                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
231                         fail(peer, pr);
232                 }
233         }
234         else {
235                 if (req->datalen < sizeof(struct xseg_reply_info)) {
236                         target = xseg_get_target(peer->xseg, req);
237                         strncpy(buf, target, req->targetlen);
238                         r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
239                         if (r < 0) {
240                                 XSEGLOG2(&lc, E, "Cannot resize request");
241                                 fail(peer, pr);
242                                 return 0;
243                         }
244                         target = xseg_get_target(peer->xseg, req);
245                         strncpy(target, buf, req->targetlen);
246                 }
247
248                 req_data = xseg_get_data(peer->xseg, req);
249                 xinfo = (struct xseg_reply_info *)req_data;
250                 if (pr->retval < 0){
251                         xinfo->size = 0;
252                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
253                         fail(peer, pr);
254                 }
255                 else {
256                         xinfo->size = rio->size;
257                         pr->retval = sizeof(uint64_t);
258                         XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
259                         complete(peer, pr);
260                 }
261         }
262         return 0;
263 }
264
265 int handle_read(struct peerd *peer, struct peer_req *pr)
266 {
267         struct rados_io *rio = (struct rados_io *) (pr->priv);
268         struct xseg_request *req = pr->req;
269         char *data;
270
271         if (req->datalen < req->size) {
272                 XSEGLOG2(&lc, E, "Request datalen is less than req size");
273                 return -1;
274         }
275
276         if (rio->state == ACCEPTED) {
277                 if (!req->size) {
278                         complete(peer, pr);
279                         return 0;
280                 }
281                 rio->state = READING;
282                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
283                 if (do_aio_read(peer, pr) < 0) {
284                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
285                                                 rio->obj_name);
286                         fail(peer, pr);
287                 }
288         }
289         else if (rio->state == READING) {
290                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
291                 data = xseg_get_data(peer->xseg, pr->req);
292                 if (pr->retval > 0)
293                         req->serviced += pr->retval;
294                 else if (pr->retval == 0) {
295                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
296                                 "%llu bytes. Zeroing out rest", rio->obj_name,
297                                 (unsigned long long) req->serviced);
298                         /* reached end of object. zero out rest of data
299                          * requested from this object
300                          */
301                         memset(data + req->serviced, 0, req->size - req->serviced);
302                         req->serviced = req->size;
303                 }
304                 else if (pr->retval == -2) {
305                         XSEGLOG2(&lc, I, "Reading of %s return -2. "
306                                         "Zeroing out data", rio->obj_name);
307                         /* object not found. return zeros instead */
308                         memset(data, 0, req->size);
309                         req->serviced = req->size;
310                 }
311                 else {
312                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
313                         /* pr->retval < 0 && pr->retval != -2 */
314                         fail(peer, pr);
315                         return 0;
316                 }
317                 if (req->serviced >= req->size) {
318                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
319                         complete(peer, pr);
320                         return 0;
321                 }
322
323                 if (!req->size) {
324                         /* should not happen */
325                         fail(peer, pr);
326                         return 0;
327                 }
328                 /* resubmit */
329                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
330                 if (do_aio_read(peer, pr) < 0) {
331                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
332                                         rio->obj_name);
333                         fail(peer, pr);
334                 }
335         }
336         else {
337                 /* should not reach this */
338                 printf("read request reached this\n");
339                 fail(peer, pr);
340         }
341         return 0;
342 }
343
344 int handle_write(struct peerd *peer, struct peer_req *pr)
345 {
346         if (pr->req->datalen < pr->req->size) {
347                 XSEGLOG2(&lc, E, "Request datalen is less than req size");
348                 return -1;
349         }
350
351         struct rados_io *rio = (struct rados_io *) (pr->priv);
352         struct xseg_request *req = pr->req;
353         if (rio->state == ACCEPTED) {
354                 if (!req->size) {
355                         // for future use
356                         if (req->flags & XF_FLUSH) {
357                                 complete(peer, pr);
358                                 return 0;
359                         }
360                         else {
361                                 complete(peer, pr);
362                                 return 0;
363                         }
364                 }
365                 //should we ensure req->op = X_READ ?
366                 rio->state = WRITING;
367                 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
368                 if (do_aio_write(peer, pr) < 0) {
369                         XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
370                                         rio->obj_name);
371                         fail(peer, pr);
372                 }
373         }
374         else if (rio->state == WRITING) {
375                 /* rados writes return 0 if write succeeded or < 0 if failed
376                  * no resubmission occurs
377                  */
378                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
379                 if (pr->retval == 0) {
380                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
381                         req->serviced = req->size;
382                         complete(peer, pr);
383                         return 0;
384                 }
385                 else {
386                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
387                         fail(peer, pr);
388                         return 0;
389                 }
390         }
391         else {
392                 /* should not reach this */
393                 printf("write request reached this\n");
394                 fail(peer, pr);
395         }
396         return 0;
397 }
398
399 int handle_copy(struct peerd *peer, struct peer_req *pr)
400 {
401         //struct radosd *rados = (struct radosd *) peer->priv;
402         struct xseg_request *req = pr->req;
403         struct rados_io *rio = (struct rados_io *) pr->priv;
404         int r;
405         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
406
407         if (rio->state == ACCEPTED){
408                 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
409                                 rio->second_name, rio->obj_name);
410                 if (!req->size) {
411                         complete(peer, pr); //or fail?
412                         return 0;
413                 }
414
415                 rio->second_name = malloc(MAX_OBJ_NAME + 1);
416                 if (!rio->second_name){
417                         fail(peer, pr);
418                         return -1;
419                 }
420                 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
421                 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
422                 strncpy(rio->second_name, xcopy->target, end);
423                 rio->second_name[end] = 0;
424
425                 rio->buf = malloc(req->size);
426                 if (!rio->buf) {
427                         r = -1;
428                         goto out_src;
429                 }
430
431                 rio->state = READING;
432                 rio->read = 0;
433                 XSEGLOG2(&lc, I, "Reading %s", rio->second_name);
434                 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
435                         req->size - rio->read, req->offset + rio->read) < 0) {
436                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
437                         fail(peer, pr);
438                         r = -1;
439                         goto out_buf;
440                 }
441         }
442         else if (rio->state == READING){
443                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
444                 if (pr->retval > 0)
445                         rio->read += pr->retval;
446                 else if (pr->retval == 0) {
447                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
448                                 "%llu bytes. Zeroing out rest", rio->obj_name,
449                                 (unsigned long long) req->serviced);
450                         memset(rio->buf + rio->read, 0, req->size - rio->read);
451                         rio->read = req->size ;
452                 }
453                 else {
454                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
455                         r = -1;
456                         goto out_buf;
457                 }
458
459                 if (rio->read >= req->size) {
460                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
461                         //do_aio_write
462                         rio->state = WRITING;
463                         XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
464                         if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
465                                         rio->buf, req->size, req->offset) < 0) {
466                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
467                                 r = -1;
468                                 goto out_buf;
469                         }
470                         return 0;
471                 }
472
473                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
474                 if (do_aio_generic(peer, pr, X_READ, rio->second_name, rio->buf + rio->read,
475                         req->size - rio->read, req->offset + rio->read) < 0) {
476                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
477                                         rio->obj_name);
478                         r = -1;
479                         goto out_buf;
480                 }
481         }
482         else if (rio->state == WRITING){
483                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
484                 if (pr->retval == 0) {
485                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
486                         XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->second_name, rio->obj_name);
487                         req->serviced = req->size;
488                         r = 0;
489                         goto out_buf;
490                 }
491                 else {
492                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
493                         XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->second_name, rio->obj_name);
494                         r = -1;
495                         goto out_buf;
496                 }
497         }
498         else {
499                 XSEGLOG2(&lc, E, "Unknown state");
500         }
501         return 0;
502
503
504 out_buf:
505         free(rio->buf);
506 out_src:
507         free(rio->second_name);
508
509         rio->buf = NULL;
510         rio->second_name = NULL;
511         rio->read = 0;
512
513         if (r < 0)
514                 fail(peer ,pr);
515         else
516                 complete(peer, pr);
517         return 0;
518 }
519
520 int handle_hash(struct peerd *peer, struct peer_req *pr)
521 {
522         //struct radosd *rados = (struct radosd *) peer->priv;
523         struct xseg_request *req = pr->req;
524         struct rados_io *rio = (struct rados_io *) pr->priv;
525         uint64_t trailing_zeros = 0;
526         unsigned char sha[SHA256_DIGEST_SIZE];
527         struct xseg_reply_hash *xreply;
528         int r;
529         char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
530         uint32_t pos;
531
532         if (rio->state == ACCEPTED){
533                 XSEGLOG2(&lc, I, "Starting hashing of object %s", rio->obj_name);
534                 if (!req->size) {
535                         fail(peer, pr); //or fail?
536                         return 0;
537                 }
538
539                 rio->second_name = malloc(MAX_OBJ_NAME+1);
540                 if (!rio->second_name){
541                         return -1;
542                 }
543                 rio->buf = malloc(req->size);
544                 if (!rio->buf) {
545                         r = -1;
546                         goto out_src;
547                 }
548
549                 rio->second_name[0] = 0;
550                 rio->state = PREHASHING;
551                 pos = 0;
552                 strncpy(hash_name, rio->obj_name, strlen(rio->obj_name));
553                 pos += strlen(rio->obj_name);
554                 strncpy(hash_name+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
555                 pos += HASH_SUFFIX_LEN;
556                 hash_name[pos] = 0;
557
558                 if (do_aio_generic(peer, pr, X_READ, hash_name, rio->second_name,
559                         HEXLIFIED_SHA256_DIGEST_SIZE, 0) < 0) {
560                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
561                         fail(peer, pr);
562                         r = -1;
563                         goto out_buf;
564                 }
565         } else if (rio->state == PREHASHING) {
566                 if (rio->second_name[0] != 0) {
567                         XSEGLOG2(&lc, D, "Precalculated hash found");
568                         xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
569                         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
570                                         sizeof(struct xseg_reply_hash));
571                         strncpy(xreply->target, rio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE);
572                         xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
573
574                         XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
575                                         rio->second_name, rio->obj_name);
576                         req->serviced = req->size;
577                         goto out_buf;
578
579                 }
580                 rio->state = READING;
581                 rio->read = 0;
582                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
583                 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
584                         req->size - rio->read, req->offset + rio->read) < 0) {
585                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
586                         fail(peer, pr);
587                         r = -1;
588                         goto out_buf;
589                 }
590         } else if (rio->state == READING){
591                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
592                 if (pr->retval >= 0)
593                         rio->read += pr->retval;
594                 else {
595                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->second_name);
596                         r = -1;
597                         goto out_buf;
598                 }
599
600                 if (!pr->retval || rio->read >= req->size) {
601                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
602                         //rstrip here in case zeros were written in the end
603                         for (;trailing_zeros < rio->read; trailing_zeros++)
604                                 if (rio->buf[rio->read-trailing_zeros -1])
605                                         break;
606                         XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
607                                         rio->read, trailing_zeros);
608
609                         rio->read -= trailing_zeros;
610                         SHA256((unsigned char *) rio->buf, rio->read, sha);
611                         hexlify(sha, SHA256_DIGEST_SIZE, rio->second_name);
612                         rio->second_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
613
614                         xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
615                         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
616                                         sizeof(struct xseg_reply_hash));
617                         strncpy(xreply->target, rio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE);
618                         xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
619
620                         XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
621                                         rio->second_name, rio->obj_name);
622
623
624                         //aio_stat
625                         rio->state = STATING;
626                         r = do_aio_generic(peer, pr, X_INFO, rio->second_name, NULL, 0, 0);
627                         if (r < 0){
628                                 XSEGLOG2(&lc, E, "Stating %s failed", rio->second_name);
629                                 r = -1;
630                                 goto out_buf;
631                         }
632                         return 0;
633                 }
634                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
635                 if (do_aio_generic(peer, pr, X_READ, rio->obj_name, rio->buf + rio->read,
636                         req->size - rio->read, req->offset + rio->read) < 0) {
637                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
638                                         rio->obj_name);
639                         r = -1;
640                         goto out_buf;
641                 }
642                 return 0;
643         } else if (rio->state == STATING){
644                 if (pr->retval < 0){
645                         //write
646                         XSEGLOG2(&lc, I, "Stating %s failed. Writing.",
647                                                         rio->second_name);
648                         rio->state = WRITING;
649                         if (do_aio_generic(peer, pr, X_WRITE, rio->second_name,
650                                                 rio->buf, rio->read, 0) < 0) {
651                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->second_name);
652                                 r = -1;
653                                 goto out_buf;
654                         }
655                         return 0;
656                 }
657                 else {
658                         XSEGLOG2(&lc, I, "Stating %s completed Successfully."
659                                         "No need to write.", rio->second_name);
660                         XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
661                                         rio->obj_name, rio->second_name);
662                         req->serviced = req->size;
663                         r = 0;
664                         goto out_buf;
665                 }
666
667         }
668         else if (rio->state == WRITING){
669                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
670                 if (pr->retval == 0) {
671                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->second_name);
672                         XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
673                                         rio->obj_name, rio->second_name);
674
675                         pos = 0;
676                         strncpy(hash_name, rio->obj_name, strlen(rio->obj_name));
677                         pos += strlen(rio->obj_name);
678                         strncpy(hash_name+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
679                         pos += HASH_SUFFIX_LEN;
680                         hash_name[pos] = 0;
681
682                         rio->state = POSTHASHING;
683                         if (do_aio_generic(peer, pr, X_WRITE, hash_name, rio->second_name,
684                                                 HEXLIFIED_SHA256_DIGEST_SIZE, 0) < 0) {
685                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", hash_name);
686                                 r = -1;
687                                 goto out_buf;
688                         }
689                         return 0;
690                 }
691                 else {
692                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
693                         XSEGLOG2(&lc, E, "Hash of object %s failed",
694                                                                 rio->obj_name);
695                         r = -1;
696                         goto out_buf;
697                 }
698         } else if (rio->state == POSTHASHING) {
699                 XSEGLOG2(&lc, I, "Writing of prehashed value callback");
700                 if (pr->retval == 0) {
701                         XSEGLOG2(&lc, I, "Writing of prehashed value completed");
702                         XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
703                                         rio->obj_name, rio->second_name);
704
705                 }
706                 else {
707                         XSEGLOG2(&lc, E, "Writing of prehash failed");
708                 }
709                 req->serviced = req->size;
710                 r = 0;
711                 goto out_buf;
712
713         }
714         else {
715                 XSEGLOG2(&lc, E, "Unknown state");
716         }
717         return 0;
718
719
720 out_buf:
721         free(rio->buf);
722 out_src:
723         free(rio->second_name);
724
725         rio->buf = NULL;
726         rio->second_name = NULL;
727         rio->read = 0;
728
729         if (r < 0)
730                 fail(peer ,pr);
731         else
732                 complete(peer, pr);
733         return 0;
734 }
735
736 int spawnthread(struct peerd *peer, struct peer_req *pr,
737                         void *(*func)(void *arg))
738 {
739         //struct radosd *rados = (struct radosd *) peer->priv;
740         struct rados_io *rio = (struct rados_io *) (pr->priv);
741
742         pthread_attr_t attr;
743         pthread_attr_init(&attr);
744         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
745
746         return (pthread_create(&rio->tid, &attr, func, (void *) pr));
747 }
748
749 void watch_cb(uint8_t opcode, uint64_t ver, void *arg)
750 {
751         //assert pr valid
752         struct peer_req *pr = (struct peer_req *)arg;
753         //struct radosd *rados = (struct radosd *) pr->peer->priv;
754         struct rados_io *rio = (struct rados_io *) (pr->priv);
755
756         if (pr->req->op == X_ACQUIRE){
757                 XSEGLOG2(&lc, I, "watch cb signaling rio of %s", rio->obj_name);
758                 pthread_cond_signal(&rio->cond);
759         }
760         else
761                 XSEGLOG2(&lc, E, "Invalid req op in watch_cb");
762 }
763
764 void * lock_op(void *arg)
765 {
766         struct peer_req *pr = (struct peer_req *)arg;
767         struct radosd *rados = (struct radosd *) pr->peer->priv;
768         struct rados_io *rio = (struct rados_io *) (pr->priv);
769         uint32_t len = strlen(rio->obj_name);
770         strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
771         rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
772
773         XSEGLOG2(&lc, I, "Starting lock op for %s", rio->obj_name);
774         if (!(pr->req->flags & XF_NOSYNC)){
775                 if (rados_watch(rados->ioctx, rio->obj_name, 0,
776                                 &rio->watch_handle, watch_cb, pr) < 0){
777                         XSEGLOG2(&lc, E, "Rados watch failed for %s",
778                                         rio->obj_name);
779                         fail(pr->peer, pr);
780                         return NULL;
781                 }
782         }
783
784         /* passing flag 1 means renew lock */
785         while(rados_lock_exclusive(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
786                 RADOS_LOCK_COOKIE, RADOS_LOCK_DESC, NULL, LIBRADOS_LOCK_FLAG_RENEW) < 0){
787                 if (pr->req->flags & XF_NOSYNC){
788                         XSEGLOG2(&lc, E, "Rados lock failed for %s",
789                                         rio->obj_name);
790                         fail(pr->peer, pr);
791                         return NULL;
792                 }
793                 else{
794                         XSEGLOG2(&lc, D, "rados lock for %s sleeping",
795                                         rio->obj_name);
796                         pthread_mutex_lock(&rio->m);
797                         pthread_cond_wait(&rio->cond, &rio->m);
798                         pthread_mutex_unlock(&rio->m);
799                         XSEGLOG2(&lc, D, "rados lock for %s woke up",
800                                         rio->obj_name);
801                 }
802         }
803         if (!(pr->req->flags & XF_NOSYNC)){
804                 if (rados_unwatch(rados->ioctx, rio->obj_name,
805                                         rio->watch_handle) < 0){
806                         XSEGLOG2(&lc, E, "Rados unwatch failed");
807                 }
808         }
809         XSEGLOG2(&lc, I, "Successfull lock op for %s", rio->obj_name);
810         complete(pr->peer, pr);
811         return NULL;
812 }
813
814 int break_lock(struct radosd *rados, struct rados_io *rio)
815 {
816         int r, exclusive;
817         char *tag = NULL, *clients = NULL, *cookies = NULL, *addrs = NULL;
818         size_t tag_len = 1024, clients_len = 1024, cookies_len = 1024;
819         size_t addrs_len = 1024;
820         ssize_t nr_lockers;
821
822         for (;;) {
823                 tag = malloc(sizeof(char) * tag_len);
824                 clients = malloc(sizeof(char) * clients_len);
825                 cookies = malloc(sizeof(char) * cookies_len);
826                 addrs = malloc(sizeof(char) * addrs_len);
827                 if (!tag || !clients || !cookies || !addrs) {
828                         XSEGLOG2(&lc, E, "Out of memmory");
829                         r = -1;
830                         break;
831                 }
832
833                 nr_lockers = rados_list_lockers(rados->ioctx, rio->obj_name,
834                                 RADOS_LOCK_NAME, &exclusive, tag, &tag_len,
835                                 clients, &clients_len, cookies, &cookies_len,
836                                 addrs, &addrs_len);
837                 if (nr_lockers < 0 && nr_lockers != -ERANGE) {
838                         XSEGLOG2(&lc, E, "Could not list lockers for %s", rio->obj_name);
839                         r = -1;
840                         break;
841                 } else if (nr_lockers == -ERANGE) {
842                         free(tag);
843                         free(clients);
844                         free(cookies);
845                         free(addrs);
846                 } else {
847                         if (nr_lockers != 1) {
848                                 XSEGLOG2(&lc, E, "Number of lockers for %s != 1 !(%d)",
849                                                 rio->obj_name, nr_lockers);
850                                 r = -1;
851                                 break;
852                         } else if (!exclusive) {
853                                 XSEGLOG2(&lc, E, "Lock for %s is not exclusive",
854                                                 rio->obj_name);
855                                 r = -1;
856                                 break;
857                         } else if (strcmp(RADOS_LOCK_TAG, tag)) {
858                                 XSEGLOG2(&lc, E, "List lockers returned wrong tag "
859                                                 "(\"%s\" vs \"%s\")", tag, RADOS_LOCK_TAG);
860                                 r = -1;
861                                 break;
862                         }
863                         r = rados_break_lock(rados->ioctx, rio->obj_name,
864                                 RADOS_LOCK_NAME, clients, RADOS_LOCK_COOKIE);
865                         break;
866                 }
867         }
868
869         free(tag);
870         free(clients);
871         free(cookies);
872         free(addrs);
873
874         return r;
875 }
876
877 void * unlock_op(void *arg)
878 {
879         struct peer_req *pr = (struct peer_req *)arg;
880         struct radosd *rados = (struct radosd *) pr->peer->priv;
881         struct rados_io *rio = (struct rados_io *) (pr->priv);
882         uint32_t len = strlen(rio->obj_name);
883         strncpy(rio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
884         rio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
885         int r;
886
887         XSEGLOG2(&lc, I, "Starting unlock op for %s", rio->obj_name);
888         if (pr->req->flags & XF_FORCE) {
889                 r = break_lock(rados, rio);
890         }
891         else {
892                 r = rados_unlock(rados->ioctx, rio->obj_name, RADOS_LOCK_NAME,
893                         RADOS_LOCK_COOKIE);
894         }
895         /* ENOENT means that the lock did not existed.
896          * This still counts as a successfull unlock operation
897          */
898         //if (r < 0 && r != -ENOENT){
899         if (r < 0){
900                 XSEGLOG2(&lc, E, "Rados unlock failed for %s (r: %d)", rio->obj_name, r);
901                 fail(pr->peer, pr);
902         }
903         else {
904                 if (rados_notify(rados->ioctx, rio->obj_name, 
905                                         0, NULL, 0) < 0) {
906                         XSEGLOG2(&lc, E, "rados notify failed");
907                 }
908                 XSEGLOG2(&lc, I, "Successfull unlock op for %s", rio->obj_name);
909                 complete(pr->peer, pr);
910         }
911         return NULL;
912 }
913
914 int handle_acquire(struct peerd *peer, struct peer_req *pr)
915 {
916         int r = spawnthread(peer, pr, lock_op);
917         if (r < 0)
918                 fail(pr->peer, pr);
919         return 0;
920 }
921
922
923 int handle_release(struct peerd *peer, struct peer_req *pr)
924 {
925         int r = spawnthread(peer, pr, unlock_op);
926         if (r < 0)
927                 fail(pr->peer, pr);
928         return 0;
929 }
930
931 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
932 {
933         int i, j;
934         struct radosd *rados = malloc(sizeof(struct radosd));
935         struct rados_io *rio;
936         if (!rados) {
937                 perror("malloc");
938                 return -1;
939         }
940         rados->pool[0] = 0;
941
942         BEGIN_READ_ARGS(argc, argv);
943         READ_ARG_STRING("--pool", rados->pool, MAX_POOL_NAME);
944         END_READ_ARGS();
945
946         if (!rados->pool[0]){
947                 XSEGLOG2(&lc, E , "Pool must be provided");
948                 free(rados);
949                 usage(argv[0]);
950                 return -1;
951         }
952
953         if (rados_create(&rados->cluster, NULL) < 0) {
954                 XSEGLOG2(&lc, E, "Rados create failed!");
955                 return -1;
956         }
957         if (rados_conf_read_file(rados->cluster, NULL) < 0){
958                 XSEGLOG2(&lc, E, "Error reading rados conf files!");
959                 return -1;
960         }
961         if (rados_connect(rados->cluster) < 0) {
962                 XSEGLOG2(&lc, E, "Rados connect failed!");
963                 rados_shutdown(rados->cluster);
964                 free(rados);
965                 return 0;
966         }
967         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
968                 XSEGLOG2(&lc, E, "Pool does not exists. Try creating it first");
969                 rados_shutdown(rados->cluster);
970                 free(rados);
971                 return -1;
972                 /*
973                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
974                         XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
975                         rados_shutdown(rados->cluster);
976                         free(rados);
977                         return -1;
978                 }
979                 XSEGLOG2(&lc, I, "Pool created.");
980                 */
981
982         }
983         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
984                 XSEGLOG2(&lc, E, "ioctx create problem.");
985                 rados_shutdown(rados->cluster);
986                 free(rados);
987                 return -1;
988         }
989         peer->priv = (void *) rados;
990         for (i = 0; i < peer->nr_ops; i++) {
991                 rio = malloc(sizeof(struct rados_io));
992                 if (!rio) {
993                         //ugly
994                         //is this really necessary?
995                         for (j = 0; j < i; j++) {
996                                 free(peer->peer_reqs[j].priv);
997                         }
998                         free(rados);
999                         perror("malloc");
1000                         return -1;
1001                 }
1002                 rio->buf = 0;
1003                 rio->read = 0;
1004                 rio->size = 0;
1005                 rio->second_name = 0;
1006                 rio->watch_handle = 0;
1007                 pthread_cond_init(&rio->cond, NULL);
1008                 pthread_mutex_init(&rio->m, NULL);
1009                 peer->peer_reqs[i].priv = (void *) rio;
1010         }
1011         return 0;
1012 }
1013
1014 // nothing to do here for now
1015 int custom_arg_parse(int argc, const char *argv[])
1016 {
1017         return 0;
1018 }
1019
1020 void custom_peer_finalize(struct peerd *peer)
1021 {
1022         return;
1023 }
1024
1025 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1026                 enum dispatch_reason reason)
1027 {
1028         struct rados_io *rio = (struct rados_io *) (pr->priv);
1029         char *target = xseg_get_target(peer->xseg, pr->req);
1030         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ?
1031                 MAX_OBJ_NAME : pr->req->targetlen;
1032
1033         if (reason == dispatch_accept) {
1034                 strncpy(rio->obj_name, target, end);
1035                 rio->obj_name[end] = 0;
1036                 rio->state = ACCEPTED;
1037                 rio->read = 0;
1038         }
1039
1040         switch (pr->req->op){
1041                 case X_READ:
1042                         handle_read(peer, pr); break;
1043                 case X_WRITE:
1044                         handle_write(peer, pr); break;
1045                 case X_DELETE:
1046                         if (canDefer(peer))
1047                                 defer_request(peer, pr);
1048                         else
1049                                 handle_delete(peer, pr);
1050                         break;
1051                 case X_INFO:
1052                         if (canDefer(peer))
1053                                 defer_request(peer, pr);
1054                         else
1055                                 handle_info(peer, pr);
1056                         break;
1057                 case X_COPY:
1058                         if (canDefer(peer))
1059                                 defer_request(peer, pr);
1060                         else
1061                                 handle_copy(peer, pr);
1062                         break;
1063                 case X_ACQUIRE:
1064                         handle_acquire(peer, pr); break;
1065                 case X_RELEASE:
1066                         handle_release(peer, pr); break;
1067                 case X_HASH:
1068                         handle_hash(peer, pr); break;
1069
1070                 default:
1071                         fail(peer, pr);
1072         }
1073         return 0;
1074 }