make mt-sosd use async remove, stat, copy
[archipelago] / xseg / peers / user / mt-sosd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <peer.h>
6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
8
9 #define MAX_POOL_NAME 64
10 #define MAX_OBJ_NAME XSEG_MAX_TARGETLEN
11
12 enum rados_state {
13         ACCEPTED = 0,
14         PENDING = 1,
15         READING = 2,
16         WRITING = 3
17 };
18
19 struct radosd {
20         rados_t cluster;
21         rados_ioctx_t ioctx;
22         char pool[MAX_POOL_NAME + 1];
23 };
24
25 struct rados_io{
26         char obj_name[MAX_OBJ_NAME + 1];
27         enum rados_state state;
28         uint64_t size;
29         char *src_name, *buf;
30         uint64_t read;
31
32 };
33
34 void rados_ack_cb(rados_completion_t c, void *arg)
35 {
36         struct peer_req *pr = (struct peer_req*) arg;
37         struct peerd *peer = pr->peer;
38         int ret = rados_aio_get_return_value(c);
39         pr->retval = ret;
40         rados_aio_release(c);
41         dispatch(peer, pr, pr->req, dispatch_internal);
42 }
43
44 void rados_commit_cb(rados_completion_t c, void *arg)
45 {
46         struct peer_req *pr = (struct peer_req*) arg;
47         struct peerd *peer = pr->peer;
48         int ret = rados_aio_get_return_value(c);
49         pr->retval = ret;
50         rados_aio_release(c);
51         dispatch(peer, pr, pr->req, dispatch_internal);
52 }
53
54 static int do_aio_generic(struct peerd *peer, struct peer_req *pr, uint32_t op,
55                 char *target, char *buf, uint64_t size, uint64_t offset)
56 {
57         struct radosd *rados = (struct radosd *) peer->priv;
58         struct rados_io *rio = (struct rados_io *) pr->priv;
59         int r;
60
61         rados_completion_t rados_compl;
62         switch (op) {
63                 case X_READ:
64                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
65                         if (r < 0)
66                                 return -1;
67                         r = rados_aio_read(rados->ioctx, target, rados_compl,
68                                         buf, size, offset);
69                         break;
70                 case X_WRITE:
71                         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
72                         if (r < 0)
73                                 return -1;
74                         r = rados_aio_write(rados->ioctx, target, rados_compl,
75                                         buf, size, offset);
76                         break;
77                 case X_DELETE:
78                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
79                         if (r < 0)
80                                 return -1;
81                         r = rados_aio_remove(rados->ioctx, target, rados_compl);
82                         break;
83                 case X_INFO:
84                         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
85                         if (r < 0)
86                                 return -1;
87                         r = rados_aio_stat(rados->ioctx, target, rados_compl, &rio->size, NULL); 
88                         break;
89                 default:
90                         return -1;
91                         break;
92         }
93         if (r < 0) {
94                 rados_aio_release(rados_compl);
95         }
96         return r;
97 }
98
99 static int do_aio_read(struct peerd *peer, struct peer_req *pr)
100 {
101         struct xseg_request *req = pr->req;
102         struct rados_io *rio = (struct rados_io *) pr->priv;
103         char *data = xseg_get_data(peer->xseg, pr->req);
104
105         return do_aio_generic(peer, pr, X_READ, rio->obj_name,
106                         data + req->serviced,
107                         req->size - req->serviced,
108                         req->offset + req->serviced);
109 }
110
111 static int do_aio_write(struct peerd *peer, struct peer_req *pr)
112 {
113         struct xseg_request *req = pr->req;
114         struct rados_io *rio = (struct rados_io *) pr->priv;
115         char *data = xseg_get_data(peer->xseg, pr->req);
116         int r;
117
118         return do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
119                         data + req->serviced,
120                         req->size - req->serviced,
121                         req->offset + req->serviced);
122 }
123
124 int handle_delete(struct peerd *peer, struct peer_req *pr)
125 {
126         int r;
127         struct radosd *rados = (struct radosd *) peer->priv;
128         struct rados_io *rio = (struct rados_io *) pr->priv;
129
130         if (rio->state == ACCEPTED) {
131                 XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
132                 rio->state = PENDING;
133                 r = do_aio_generic(peer, pr, X_DELETE, rio->obj_name, NULL, 0, 0);
134                 if (r < 0) {
135                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
136                         fail(peer, pr);
137                 }
138         }
139         else {
140                 if (pr->retval < 0){
141                         XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
142                         fail(peer, pr);
143                 }
144                 else {
145                         XSEGLOG2(&lc, I, "Deletion of %s completed", rio->obj_name);
146                         complete(peer, pr);
147                 }
148         }
149         return 0;
150 }
151
152 int handle_info(struct peerd *peer, struct peer_req *pr)
153 {
154         int r;
155         struct xseg_request *req = pr->req;
156         struct radosd *rados = (struct radosd *) peer->priv;
157         struct rados_io *rio = (struct rados_io *) pr->priv;
158         char *req_data = xseg_get_data(peer->xseg, req);
159         struct xseg_reply_info *xinfo = (struct xseg_reply_info *)req_data;
160
161         if (rio->state == ACCEPTED) {
162                 XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);
163                 rio->state = PENDING;
164                 r = do_aio_generic(peer, pr, X_INFO, rio->obj_name, NULL, 0, 0);
165                 if (r < 0) {
166                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
167                         fail(peer, pr);
168                 }
169         }
170         else {
171                 if (pr->retval < 0){
172                         xinfo->size = 0;
173                         XSEGLOG2(&lc, E, "Getting info of %s failed", rio->obj_name);   
174                         fail(peer, pr);
175                 }
176                 else {
177                         xinfo->size = rio->size;
178                         pr->retval = sizeof(uint64_t);
179                         XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
180                         complete(peer, pr);
181                 }
182         }
183         return 0;
184 }
185
186 int handle_read(struct peerd *peer, struct peer_req *pr)
187 {
188         struct rados_io *rio = (struct rados_io *) (pr->priv);
189         struct xseg_request *req = pr->req;
190         char *data;
191         if (rio->state == ACCEPTED) {
192                 if (!req->size) {
193                         complete(peer, pr);
194                         return 0;
195                 }
196                 rio->state = READING;
197                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
198                 if (do_aio_read(peer, pr) < 0) {
199                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read",
200                                                 rio->obj_name);
201                         fail(peer, pr);
202                 }
203         }
204         else if (rio->state == READING) {
205                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
206                 data = xseg_get_data(peer->xseg, pr->req);
207                 if (pr->retval > 0)
208                         req->serviced += pr->retval;
209                 else if (pr->retval == 0) {
210                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
211                                 "%llu bytes. Zeroing out rest", rio->obj_name,
212                                 (unsigned long long) req->serviced);
213                         /* reached end of object. zero out rest of data
214                          * requested from this object
215                          */
216                         memset(data + req->serviced, 0, req->datalen - req->serviced);
217                         req->serviced = req->datalen;
218                 }
219                 else if (pr->retval == -2) {
220                         XSEGLOG2(&lc, I, "Reading of %s return -2. "
221                                         "Zeroing out data", rio->obj_name);
222                         /* object not found. return zeros instead */
223                         memset(data, 0, req->datalen);
224                         req->serviced = req->datalen;
225                 }
226                 else {
227                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
228                         /* pr->retval < 0 && pr->retval != -2 */
229                         fail(peer, pr);
230                         return 0;
231                 }
232                 if (req->serviced >= req->datalen) {
233                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
234                         complete(peer, pr);
235                         return 0;
236                 }
237
238                 if (!req->size) {
239                         /* should not happen */
240                         fail(peer, pr);
241                         return 0;
242                 }
243                 /* resubmit */
244                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
245                 if (do_aio_read(peer, pr) < 0) {
246                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
247                                         rio->obj_name);
248                         fail(peer, pr);
249                 }
250         }
251         else {
252                 /* should not reach this */
253                 printf("read request reached this\n");
254                 fail(peer, pr);
255         }
256         return 0;
257 }
258
259 int handle_write(struct peerd *peer, struct peer_req *pr)
260 {
261         struct rados_io *rio = (struct rados_io *) (pr->priv);
262         struct xseg_request *req = pr->req;
263         if (rio->state == ACCEPTED) {
264                 if (!req->size) {
265                         // for future use
266                         if (req->flags & XF_FLUSH) {
267                                 complete(peer, pr);
268                                 return 0;
269                         }
270                         else {
271                                 complete(peer, pr);
272                                 return 0;
273                         }
274                 }
275                 //should we ensure req->op = X_READ ?
276                 rio->state = WRITING;
277                 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
278                 if (do_aio_write(peer, pr) < 0) {
279                         XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write",
280                                         rio->obj_name);
281                         fail(peer, pr);
282                 }
283         }
284         else if (rio->state == WRITING) {
285                 /* rados writes return 0 if write succeeded or < 0 if failed
286                  * no resubmission occurs
287                  */
288                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
289                 if (pr->retval == 0) {
290                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
291                         req->serviced = req->datalen;
292                         complete(peer, pr);
293                         return 0;
294                 }
295                 else {
296                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
297                         fail(peer, pr);
298                         return 0;
299                 }
300         }
301         else {
302                 /* should not reach this */
303                 printf("write request reached this\n");
304                 fail(peer, pr);
305         }
306         return 0;
307 }
308
309 int handle_copy(struct peerd *peer, struct peer_req *pr)
310 {
311         struct radosd *rados = (struct radosd *) peer->priv;
312         struct xseg_request *req = pr->req;
313         struct rados_io *rio = (struct rados_io *) pr->priv;
314         int r;
315         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)xseg_get_data(peer->xseg, req);
316
317         if (rio->state == ACCEPTED){
318                 XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
319                                 rio->src_name, rio->obj_name);
320                 if (!req->size) {
321                         complete(peer, pr); //or fail?
322                         return 0;
323                 }
324
325                 rio->src_name = malloc(MAX_OBJ_NAME + 1);
326                 if (!rio->src_name){
327                         fail(peer, pr);
328                         return -1;
329                 }
330                 //NULL terminate or fail if targetlen > MAX_OBJ_NAME ?
331                 unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
332                 strncpy(rio->src_name, xcopy->target, end);
333                 rio->src_name[end] = 0;
334
335                 rio->buf = malloc(req->size);
336                 if (!rio->buf) {
337                         r = -1;
338                         goto out_src;
339                 }
340
341                 rio->state = READING;
342                 rio->read = 0;
343                 XSEGLOG2(&lc, I, "Reading %s", rio->src_name);
344                 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
345                         req->size - rio->read, req->offset + rio->read) < 0) {
346                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
347                         fail(peer, pr);
348                         r = -1;
349                         goto out_buf;
350                 }
351         }
352         else if (rio->state == READING){
353                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
354                 if (pr->retval > 0)
355                         rio->read += pr->retval;
356                 else if (pr->retval == 0) {
357                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at "
358                                 "%llu bytes. Zeroing out rest", rio->obj_name,
359                                 (unsigned long long) req->serviced);
360                         memset(rio->buf + rio->read, 0, req->size - rio->read);
361                         rio->read = req->size ;
362                 }
363                 else {
364                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->src_name);
365                         r = -1;
366                         goto out_buf;
367                 }
368
369                 if (rio->read >= req->size) {
370                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
371                         //do_aio_write
372                         rio->state = WRITING;
373                         XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
374                         if (do_aio_generic(peer, pr, X_WRITE, rio->obj_name,
375                                         rio->buf, req->size, req->offset) < 0) {
376                                 XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
377                                 r = -1;
378                                 goto out_buf;
379                         }
380                         return 0;
381                 }
382
383                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
384                 if (do_aio_generic(peer, pr, X_READ, rio->src_name, rio->buf + rio->read,
385                         req->size - rio->read, req->offset + rio->read) < 0) {
386                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read",
387                                         rio->obj_name);
388                         r = -1;
389                         goto out_buf;
390                 }
391         }
392         else if (rio->state == WRITING){
393                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
394                 if (pr->retval == 0) {
395                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
396                         XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", rio->src_name, rio->obj_name);
397                         req->serviced = req->size;
398                         r = 0;
399                         goto out_buf;
400                 }
401                 else {
402                         XSEGLOG2(&lc, E, "Writing of %s failed", rio->obj_name);
403                         XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", rio->src_name, rio->obj_name);
404                         r = -1;
405                         goto out_buf;
406                 }
407         }
408         else {
409                 XSEGLOG2(&lc, E, "Unknown state");
410         }
411         return 0;
412
413
414 out_buf:
415         free(rio->buf);
416 out_src:
417         free(rio->src_name);
418
419         rio->buf = NULL;
420         rio->src_name = NULL;
421         rio->read = 0;
422
423         if (r < 0)
424                 fail(peer ,pr);
425         else
426                 complete(peer, pr);
427         return 0;
428 }
429
430 int handle_open(struct peerd *peer, struct peer_req *pr)
431 {
432         struct radosd *rados = (struct radosd *) peer->priv;
433         struct rados_io *rio = (struct rados_io *) (pr->priv);
434         int r = rados_lock(rados->ioctx, rio->obj_name);
435         if (r < 0){
436                 fail(peer, pr);
437         }
438         else {
439                 complete(peer, pr);
440         }
441         return 0;
442 }
443
444
445 int handle_close(struct peerd *peer, struct peer_req *pr)
446 {
447         struct radosd *rados = (struct radosd *) peer->priv;
448         struct rados_io *rio = (struct rados_io *) (pr->priv);
449         int r = rados_unlock(rados->ioctx, rio->obj_name);
450         if (r < 0){
451                 fail(peer, pr);
452         }
453         else {
454                 complete(peer, pr);
455         }
456         return 0;
457 }
458
459 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
460 {
461         int i, j;
462         struct radosd *rados = malloc(sizeof(struct radosd));
463         struct rados_io *rio;
464         if (!rados) {
465                 perror("malloc");
466                 return -1;
467         }
468         rados->pool[0] = 0;
469         for (i = 0; i < argc; i++) {
470                 if (!strcmp(argv[i], "--pool") && (i+1) < argc){
471                         strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
472                         rados->pool[MAX_POOL_NAME] = 0;
473                         i += 1;
474                         continue;
475                 }
476         }
477         if (!rados->pool[0]){
478                 XSEGLOG2(&lc, E , "Pool must be provided");
479                 free(rados);
480                 return -1;
481         }
482
483         if (rados_create(&rados->cluster, NULL) < 0) {
484                 XSEGLOG2(&lc, E, "Rados create failed!");
485                 return -1;
486         }
487         if (rados_conf_read_file(rados->cluster, NULL) < 0){
488                 XSEGLOG2(&lc, E, "Error reading rados conf files!");
489                 return -1;
490         }
491         if (rados_connect(rados->cluster) < 0) {
492                 XSEGLOG2(&lc, E, "Rados connect failed!");
493                 rados_shutdown(rados->cluster);
494                 free(rados);
495                 return 0;
496         }
497         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
498                 XSEGLOG2(&lc, I, "Pool does not exists. I will try to create it");
499                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
500                         XSEGLOG2(&lc, E, "Couldn't create pool %s", rados->pool);
501                         rados_shutdown(rados->cluster);
502                         free(rados);
503                         return -1;
504                 }
505                 XSEGLOG2(&lc, I, "Pool created.");
506         }
507         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0){
508                 XSEGLOG2(&lc, E, "ioctx create problem.");
509                 rados_shutdown(rados->cluster);
510                 free(rados);
511                 return -1;
512         }
513         peer->priv = (void *) rados;
514         for (i = 0; i < peer->nr_ops; i++) {
515                 rio = malloc(sizeof(struct rados_io));
516                 if (!rio) {
517                         //ugly
518                         //is this really necessary?
519                         for (j = 0; j < i; j++) {
520                                 free(peer->peer_reqs[j].priv);
521                         }
522                         free(rados);
523                         perror("malloc");
524                         return -1;
525                 }
526                 peer->peer_reqs[i].priv = (void *) rio;
527         }
528         return 0;
529 }
530
531 // nothing to do here for now
532 int custom_arg_parse(int argc, const char *argv[])
533 {
534         return 0;
535 }
536
537 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
538                 enum dispatch_reason reason)
539 {
540         struct rados_io *rio = (struct rados_io *) (pr->priv);
541         char *target = xseg_get_target(peer->xseg, pr->req);
542         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
543         strncpy(rio->obj_name, target, end);
544         rio->obj_name[end] = 0;
545         //log_pr("dispatch", pr);
546         if (reason == dispatch_accept)
547                 rio->state = ACCEPTED;
548
549         switch (pr->req->op){
550                 case X_READ:
551                         handle_read(peer, pr); break;
552                 case X_WRITE: 
553                         handle_write(peer, pr); break;
554                 case X_DELETE:
555                         if (canDefer(peer))
556                                 defer_request(peer, pr);
557                         else
558                                 handle_delete(peer, pr);
559                         break;
560                 case X_INFO:
561                         if (canDefer(peer))
562                                 defer_request(peer, pr);
563                         else
564                                 handle_info(peer, pr);
565                         break;
566                 case X_COPY:
567                         if (canDefer(peer))
568                                 defer_request(peer, pr);
569                         else
570                                 handle_copy(peer, pr);
571                         break;
572                 case X_OPEN:
573                         handle_open(peer, pr); break;
574                 case X_CLOSE:
575                         handle_close(peer, pr); break;
576
577                 default:
578                         fail(peer, pr);
579         }
580         return 0;
581 }