add volume creation capability to mapper
[archipelago] / xseg / peers / user / mt-sosd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <mpeer.h>
6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
8
9 #define MAX_POOL_NAME 64
10 #define MAX_OBJ_NAME 256
11
12 extern struct log_ctx lc;
13
14 enum rados_state {
15         ACCEPTED = 0,
16         PENDING = 1
17 };
18
19 struct radosd {
20         rados_t cluster;
21         rados_ioctx_t ioctx;
22         char pool[MAX_POOL_NAME + 1];
23 };
24
25 struct rados_io{
26         char obj_name[MAX_OBJ_NAME + 1];
27         enum rados_state state;
28 };
29
30 void rados_ack_cb(rados_completion_t c, void *arg)
31 {
32         struct peer_req *pr = (struct peer_req*) arg;
33         struct peerd *peer = pr->peer;
34         int ret = rados_aio_get_return_value(c);
35         pr->retval = ret;
36         rados_aio_release(c);
37         dispatch(peer, pr, pr->req, dispatch_internal);
38 }
39
40 void rados_commit_cb(rados_completion_t c, void *arg)
41 {
42         struct peer_req *pr = (struct peer_req*) arg;
43         struct peerd *peer = pr->peer;
44         int ret = rados_aio_get_return_value(c);
45         pr->retval = ret;
46         rados_aio_release(c);
47         dispatch(peer, pr, pr->req, dispatch_internal);
48 }
49
50 int do_aio_read(struct peerd *peer, struct peer_req *pr)
51 {
52         struct radosd *rados = (struct radosd *) peer->priv;
53         struct xseg_request *req = pr->req;
54         struct rados_io *rio = (struct rados_io *) pr->priv;
55         char *data = xseg_get_data(peer->xseg, pr->req);
56         int r;
57
58         rados_completion_t rados_compl;
59         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
60         if (r < 0) 
61                 return -1;
62         r = rados_aio_read(rados->ioctx, rio->obj_name, rados_compl, 
63                         data + req->serviced,
64                         req->size - req->serviced,
65                         req->offset + req->serviced);
66         if (r < 0) {
67                 rados_aio_release(rados_compl);
68                 return -1;
69         }
70         return 0;
71 }
72
73 int do_aio_write(struct peerd *peer, struct peer_req *pr)
74 {
75         struct radosd *rados = (struct radosd *) peer->priv;
76         struct xseg_request *req = pr->req;
77         struct rados_io *rio = (struct rados_io *) pr->priv;
78         char *data = xseg_get_data(peer->xseg, pr->req);
79         int r;
80
81         rados_completion_t rados_compl;
82         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
83         if (r < 0) 
84                 return -1;
85         r = rados_aio_write(rados->ioctx, rio->obj_name, rados_compl, 
86                         data + req->serviced,
87                         req->size - req->serviced,
88                         req->offset + req->serviced);
89         if (r < 0) {
90                 rados_aio_release(rados_compl);
91                 return -1;
92         }
93         return 0;
94 }
95
96 int handle_delete(struct peerd *peer, struct peer_req *pr)
97 {
98         int r;
99         struct radosd *rados = (struct radosd *) peer->priv;
100         struct rados_io *rio = (struct rados_io *) pr->priv;
101         
102         //log_pr("delete start", pr);
103         XSEGLOG2(&lc, I, "Deleting %s", rio->obj_name);
104         r = rados_remove(rados->ioctx, rio->obj_name);
105         if (r < 0) {
106                 pr->retval = r;
107                 XSEGLOG2(&lc, E, "Deletion of %s failed", rio->obj_name);
108                 fail(peer, pr);
109         }
110         else {
111                 pr->retval = 0;
112                 XSEGLOG2(&lc, E, "Deletion of %s completed", rio->obj_name);
113                 complete(peer, pr);
114         }
115         return 0;
116 }
117
118 int handle_info(struct peerd *peer, struct peer_req *pr)
119 {
120         uint64_t size;
121         time_t pmtime;
122         int r;
123         struct xseg_request *req = pr->req;
124         struct radosd *rados = (struct radosd *) peer->priv;
125         struct rados_io *rio = (struct rados_io *) pr->priv;
126         char *req_data = xseg_get_data(peer->xseg, req);
127         struct xseg_reply_info *xinfo = req_data;
128
129         XSEGLOG2(&lc, I, "Getting info of %s", rio->obj_name);  
130         r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
131         if (r < 0) {
132                 pr->retval = r;
133                 XSEGLOG2(&lc, I, "Getting info of %s failed", rio->obj_name);   
134                 fail(peer, pr);
135         }
136         else {
137                 xinfo->size = size;
138                 pr->retval = sizeof(uint64_t);
139                 XSEGLOG2(&lc, I, "Getting info of %s completed", rio->obj_name);        
140                 complete(peer,pr);
141         }
142         return 0;
143 }
144
145 //FIXME req->state no longer apply
146 int handle_read(struct peerd *peer, struct peer_req *pr)
147 {
148         struct rados_io *rio = (struct rados_io *) (pr->priv);
149         struct xseg_request *req = pr->req;
150         char *data;
151         if (rio->state == ACCEPTED) {
152                 if (!req->size) {
153                         complete(peer, pr);
154                         return 0;
155                 }
156                 //should we ensure req->op = X_READ ?
157                 rio->state = PENDING;
158                 XSEGLOG2(&lc, I, "Reading %s", rio->obj_name);
159                 if (do_aio_read(peer, pr) < 0) {
160                         XSEGLOG2(&lc, I, "Reading of %s failed on do_aio_read", rio->obj_name);
161                         fail(peer, pr);
162                 }
163         }
164         else if (rio->state == PENDING) {
165                 XSEGLOG2(&lc, I, "Reading of %s callback", rio->obj_name);
166                 data = xseg_get_data(peer->xseg, pr->req);
167                 if (pr->retval > 0) 
168                         req->serviced += pr->retval;
169                 else if (pr->retval == 0) {
170                         XSEGLOG2(&lc, I, "Reading of %s reached end of file at %llu bytes. Zeroing out rest", 
171                                                 rio->obj_name, (unsigned long long) req->serviced);
172                         /* reached end of object. zero out rest of data
173                          * requested from this object
174                          */
175                         memset(data + req->serviced, 0, req->datalen - req->serviced);
176                         req->serviced = req->datalen ;
177                 }
178                 else if (pr->retval == -2) {
179                         XSEGLOG2(&lc, I, "Reading of %s return -2. Zeroing out data", rio->obj_name);
180                         /* object not found. return zeros instead */
181                         memset(data, 0, req->datalen);
182                         req->serviced = req->datalen;
183                 }
184                 else {
185                         XSEGLOG2(&lc, E, "Reading of %s failed", rio->obj_name);
186                         /* pr->retval < 0 && pr->retval != -2 */
187                         fail(peer, pr);
188                         return 0;
189                 }
190                 if (req->serviced >= req->datalen) {
191                         XSEGLOG2(&lc, I, "Reading of %s completed", rio->obj_name);
192                         complete(peer, pr);
193                         return 0;
194                 }
195
196                 if (!req->size) {
197                         /* should not happen */
198                         fail(peer, pr);
199                         return 0;
200                 }
201                 //TODO assert req->op == X_READ
202                 
203                 /* resubmit */
204                 XSEGLOG2(&lc, I, "Resubmitting read of %s", rio->obj_name);
205                 if (do_aio_read(peer, pr) < 0) {
206                         XSEGLOG2(&lc, E, "Reading of %s failed on do_aio_read", rio->obj_name);
207                         fail(peer, pr);
208                 }
209         }
210         else {
211                 /* should not reach this */
212                 printf("read request reached this\n");
213                 fail(peer, pr);
214         }
215         return 0;
216 }
217
218 int handle_write(struct peerd *peer, struct peer_req *pr)
219 {
220         struct rados_io *rio = (struct rados_io *) (pr->priv);
221         struct xseg_request *req = pr->req;
222         if (rio->state == ACCEPTED) {
223                 if (!req->size) {
224                         // for future use
225                         if (req->flags & XF_FLUSH) {
226                                 complete(peer, pr);
227                                 return 0;
228                         }
229                         else {
230                                 complete(peer, pr);
231                                 return 0;
232                         }
233                 }
234                 //should we ensure req->op = X_READ ?
235                 rio->state = PENDING;
236                 XSEGLOG2(&lc, I, "Writing %s", rio->obj_name);
237                 if (do_aio_write(peer, pr) < 0) {
238                         XSEGLOG2(&lc, E, "Writing of %s failed on do_aio_write", rio->obj_name);
239                         fail(peer, pr);
240                 }
241         }
242         else if (rio->state == PENDING) {
243                 /* rados writes return 0 if write succeeded or < 0 if failed
244                  * no resubmission occurs
245                  */
246                 XSEGLOG2(&lc, I, "Writing of %s callback", rio->obj_name);
247                 if (pr->retval == 0) {
248                         XSEGLOG2(&lc, I, "Writing of %s completed", rio->obj_name);
249                         req->serviced = req->datalen;
250                         complete(peer, pr);
251                         return 0;
252                 }
253                 else {
254                         XSEGLOG2(&lc, I, "Writing of %s failed", rio->obj_name);
255                         fail(peer, pr);
256                         return 0;
257                 }
258         }
259         else {
260                 /* should not reach this */
261                 printf("write request reached this\n");
262                 fail(peer, pr);
263         }
264         return 0;
265 }
266
267 int handle_copy(struct peerd *peer, struct peer_req *pr)
268 {
269         struct radosd *rados = (struct radosd *) peer->priv;
270         struct xseg_request *req = pr->req;
271         struct rados_io *rio = (struct rados_io *) pr->priv;
272         int r, sum;
273         char *buf, src_name[MAX_OBJ_NAME + 1];
274         struct xseg_request_copy *xcopy = xseg_get_data(peer->xseg, req);
275         unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : xcopy->targetlen;
276
277         strncpy(src_name, xcopy->target, end);
278         src_name[end] = 0;
279
280         req->serviced = 0;
281         buf = malloc(req->size);
282         if (!buf) {
283                 fail(peer, pr);
284                 return -1;
285         }
286         XSEGLOG2(&lc, I, "Copy of object %s to object %s started", src_name, rio->obj_name);
287         sum = 0;
288         do {
289                 r = rados_read(rados->ioctx, src_name, buf, req->size, 0);
290                 if (r < 0){
291                         XSEGLOG2(&lc, E, "Read of object %s failed", src_name);
292                         goto out_fail;
293                 }
294                 else if (r == 0) {
295                         memset(buf+r, 0, req->size - r);
296                         sum = req->size;
297                 } else 
298                         sum += r;
299         } while (sum < req->size);
300         XSEGLOG2(&lc, D, "Read of object %s Completed", src_name);
301
302         r = rados_write_full(rados->ioctx, rio->obj_name, buf, req->size);
303         if (r < 0){
304                 XSEGLOG2(&lc, E, "Read of object %s failed", rio->obj_name);
305                 goto out_fail;
306         }
307         
308         free(buf);
309         req->serviced = req->size;
310         XSEGLOG2(&lc, I, "Copy of object %s to object %s completed", src_name, rio->obj_name);
311         complete(peer, pr);
312         return 0;
313
314 out_fail:
315         free(buf);
316         pr->retval = -1;
317         XSEGLOG2(&lc, E, "Copy of object %s to object %s failed", src_name, rio->obj_name);
318         fail(peer, pr);
319         return 0;
320 }
321
322 int handle_open(struct peerd *peer, struct peer_req *pr)
323 {
324         /* FIXME to be implemented */
325         complete(peer, pr);
326         return 0;
327 }
328
329
330 int handle_close(struct peerd *peer, struct peer_req *pr)
331 {
332         /* FIXME to be implemented */
333         complete(peer, pr);
334         return 0;
335 }
336
337 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
338 {
339         int i, j;
340         struct radosd *rados = malloc(sizeof(struct radosd));
341         struct rados_io *rio;
342         if (!rados) {
343                 perror("malloc");
344                 return -1;
345         }
346         rados->pool[0] = 0;
347         for (i = 0; i < argc; i++) {
348                 if (!strcmp(argv[i], "--pool") && (i+1) < argc){
349                         strncpy(rados->pool, argv[i+1], MAX_POOL_NAME);
350                         rados->pool[MAX_POOL_NAME] = 0;
351                         i += 1;
352                         continue;
353                 }
354         }
355         if (!rados->pool[0]){
356                 free(rados);
357                 return -1;
358         }
359
360         if (rados_create(&rados->cluster, NULL) < 0) {
361                 printf("Rados create failed!\n");
362                 return -1;
363         }
364         if (rados_conf_read_file(rados->cluster, NULL) < 0){
365                 printf("Error reading rados conf files!\n");
366                 return -1;
367         }
368         if (rados_connect(rados->cluster) < 0) {
369                 printf("Rados connect failed!\n");
370                 rados_shutdown(rados->cluster);
371                 free(rados);
372                 return 0;
373         }
374         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
375                 printf( "Pool does not exists. I will try to create it\n");
376                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
377                         printf("Couldn't create pool!\n");
378                         rados_shutdown(rados->cluster);
379                         free(rados);
380                         return -1;
381                 }
382                 printf("Pool created.\n");
383         }
384         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0) {
385                 printf("ioctx create problem.\n");
386                 rados_shutdown(rados->cluster);
387                 free(rados);
388                 return -1;
389         }
390         peer->priv = (void *) rados;
391         for (i = 0; i < peer->nr_ops; i++) {
392                 rio = malloc(sizeof(struct rados_io));
393                 if (!rio) {
394                         //ugly
395                         for (j = 0; j < i; j++) {
396                                 free(peer->peer_reqs[j].priv);
397                         }
398                         free(rados);
399                         perror("malloc");
400                         return -1;
401                 }
402                 peer->peer_reqs[i].priv = (void *) rio;
403         }
404         return 0;
405 }
406
407 // nothing to do here for now
408 int custom_arg_parse(int argc, const char *argv[])
409 {
410         return 0;
411 }
412
413 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
414                 enum dispatch_reason reason)
415 {
416         struct rados_io *rio = (struct rados_io *) (pr->priv);
417         char *target = xseg_get_target(peer->xseg, pr->req);
418         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ? MAX_OBJ_NAME : pr->req->targetlen;
419         strncpy(rio->obj_name, target, end);
420         rio->obj_name[end] = 0;
421         //log_pr("dispatch", pr);
422         if (reason == dispatch_accept)
423                 rio->state = ACCEPTED;
424
425         switch (pr->req->op){
426                 case X_READ:
427                         handle_read(peer, pr); break;
428                 case X_WRITE: 
429                         handle_write(peer, pr); break;
430                 case X_DELETE:
431                         if (canDefer(peer))
432                                 defer_request(peer, pr);
433                         else
434                                 handle_delete(peer, pr);
435                         break;
436                 case X_INFO:
437                         if (canDefer(peer))
438                                 defer_request(peer, pr);
439                         else
440                                 handle_info(peer, pr);
441                         break;
442                 case X_COPY:
443                         if (canDefer(peer))
444                                 defer_request(peer, pr);
445                         else
446                                 handle_copy(peer, pr);
447                         break;
448                 case X_OPEN:
449                         handle_open(peer, pr); break;
450                 case X_CLOSE:
451                         handle_close(peer, pr); break;
452
453                 default:
454                         fail(peer, pr);
455         }
456         return 0;
457 }