add skeleton for X_COPY to mt-sosd
[archipelago] / xseg / peers / user / mt-sosd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <mpeer.h>
6 #include <rados/librados.h>
7 #include <xseg/protocol.h>
8
9 #define MAX_POOL_NAME 64
10 #define MAX_OBJ_NAME 256
11
12 struct radosd {
13         rados_t cluster;
14         rados_ioctx_t ioctx;
15         char pool[MAX_POOL_NAME];
16 };
17
18 struct rados_io{
19         char obj_name[MAX_OBJ_NAME];
20 };
21
22 void rados_ack_cb(rados_completion_t c, void *arg)
23 {
24         struct peer_req *pr = (struct peer_req*) arg;
25         struct peerd *peer = pr->peer;
26         int ret = rados_aio_get_return_value(c);
27         pr->retval = ret;
28         rados_aio_release(c);
29         dispatch(peer, pr, pr->req);
30 }
31
32 void rados_commit_cb(rados_completion_t c, void *arg)
33 {
34         struct peer_req *pr = (struct peer_req*) arg;
35         struct peerd *peer = pr->peer;
36         int ret = rados_aio_get_return_value(c);
37         pr->retval = ret;
38         rados_aio_release(c);
39         dispatch(peer, pr, pr->req);
40 }
41
42 int do_aio_read(struct peerd *peer, struct peer_req *pr)
43 {
44         struct radosd *rados = (struct radosd *) peer->priv;
45         struct xseg_request *req = pr->req;
46         struct rados_io *rio = (struct rados_io *) pr->priv;
47         char *data = xseg_get_data(peer->xseg, pr->req);
48         int r;
49
50         rados_completion_t rados_compl;
51         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
52         if (r < 0) 
53                 return -1;
54         r = rados_aio_read(rados->ioctx, rio->obj_name, rados_compl, 
55                         data + req->serviced,
56                         req->size - req->serviced,
57                         req->offset + req->serviced);
58         if (r < 0) {
59                 rados_aio_release(rados_compl);
60                 return -1;
61         }
62         return 0;
63 }
64
65 int do_aio_write(struct peerd *peer, struct peer_req *pr)
66 {
67         struct radosd *rados = (struct radosd *) peer->priv;
68         struct xseg_request *req = pr->req;
69         struct rados_io *rio = (struct rados_io *) pr->priv;
70         char *data = xseg_get_data(peer->xseg, pr->req);
71         int r;
72
73         rados_completion_t rados_compl;
74         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
75         if (r < 0) 
76                 return -1;
77         r = rados_aio_write(rados->ioctx, rio->obj_name, rados_compl, 
78                         data + req->serviced,
79                         req->size - req->serviced,
80                         req->offset + req->serviced);
81         if (r < 0) {
82                 rados_aio_release(rados_compl);
83                 return -1;
84         }
85         return 0;
86 }
87
88 int handle_delete(struct peerd *peer, struct peer_req *pr)
89 {
90         int r;
91         struct radosd *rados = (struct radosd *) peer->priv;
92         struct rados_io *rio = (struct rados_io *) pr->priv;
93         
94         //log_pr("delete start", pr);
95         r = rados_remove(rados->ioctx, rio->obj_name);
96         if (r < 0) {
97                 pr->retval = r;
98                 fail(peer, pr);
99         }
100         else {
101                 pr->retval = 0;
102                 complete(peer, pr);
103         }
104         return 0;
105 }
106
107 int handle_info(struct peerd *peer, struct peer_req *pr)
108 {
109         uint64_t size;
110         time_t pmtime;
111         int r;
112         struct xseg_request *req = pr->req;
113         struct radosd *rados = (struct radosd *) peer->priv;
114         struct rados_io *rio = (struct rados_io *) pr->priv;
115         char *req_data = xseg_get_data(peer->xseg, req);
116         struct xseg_reply_info *xinfo = req_data;
117
118         log_pr("info start", pr);
119         
120         r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
121         if (r < 0) {
122                 pr->retval = r;
123                 fail(peer, pr);
124         }
125         else {
126                 xinfo->size = size;
127                 pr->retval = sizeof(uint64_t);
128                 complete(peer,pr);
129         }
130         return 0;
131 }
132
133 int handle_read(struct peerd *peer, struct peer_req *pr)
134 {
135         struct xseg_request *req = pr->req;
136         char *data;
137         if (req->state == XS_ACCEPTED) {
138                 if (!req->size) {
139                         complete(peer, pr);
140                         return 0;
141                 }
142                 //should we ensure req->op = X_READ ?
143                 pending(peer, pr);
144                 log_pr("read", pr);
145                 if (do_aio_read(peer, pr) < 0) {
146                         fail(peer, pr);
147                 }
148         }
149         else if (req->state == XS_PENDING) {
150                 data = xseg_get_data(peer->xseg, pr->req);
151                 if (pr->retval > 0) 
152                         req->serviced += pr->retval;
153                 else if (pr->retval == 0) {
154                         /* reached end of object. zero out rest of data
155                          * requested from this object
156                          */
157                         memset(data, 0, req->datalen - req->serviced);
158                         req->serviced = req->datalen ;
159                 }
160                 else if (pr->retval == -2) {
161                         /* object not found. return zeros instead */
162                         memset(data, 0, req->datalen);
163                         req->serviced = req->datalen;
164                 }
165                 else {
166                         /* pr->retval < 0 && pr->retval != -2 */
167                         fail(peer, pr);
168                         return 0;
169                 }
170                 if (req->serviced >= req->datalen) {
171                         log_pr("read complete", pr);
172                         complete(peer, pr);
173                         return 0;
174                 }
175
176                 if (!req->size) {
177                         /* should not happen */
178                         fail(peer, pr);
179                         return 0;
180                 }
181                 //TODO assert req->op == X_READ
182                 
183                 /* resubmit */
184                 log_pr("read resubmit", pr);
185                 if (do_aio_read(peer, pr) < 0) {
186                         fail(peer, pr);
187                 }
188         }
189         else {
190                 /* should not reach this */
191                 printf("read request reached this\n");
192                 fail(peer, pr);
193         }
194         return 0;
195 }
196
197 int handle_write(struct peerd *peer, struct peer_req *pr)
198 {
199         struct xseg_request *req = pr->req;
200         if (req->state == XS_ACCEPTED) {
201                 if (!req->size) {
202                         // for future use
203                         if (req->flags & XF_FLUSH) {
204                                 complete(peer, pr);
205                                 return 0;
206                         }
207                         else {
208                                 complete(peer, pr);
209                                 return 0;
210                         }
211                 }
212                 //should we ensure req->op = X_READ ?
213                 pending(peer, pr);
214                 //log_pr("write", pr);
215                 if (do_aio_write(peer, pr) < 0) {
216                         fail(peer, pr);
217                 }
218         }
219         else if (req->state == XS_PENDING) {
220                 /* rados writes return 0 if write succeeded or < 0 if failed
221                  * no resubmission occurs
222                  */
223                 //log_pr("write complete", pr);
224                 if (pr->retval == 0) {
225                         req->serviced = req->datalen;
226                         complete(peer, pr);
227                         return 0;
228                 }
229                 else {
230                         fail(peer, pr);
231                         return 0;
232                 }
233         }
234         else {
235                 /* should not reach this */
236                 printf("write request reached this\n");
237                 fail(peer, pr);
238         }
239         return 0;
240 }
241
242 int handle_copy(struct peerd *peer, struct peer_req *pr)
243 {
244         return 0;
245 }
246
247
248 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
249 {
250         int i, j;
251         struct radosd *rados = malloc(sizeof(struct radosd));
252         struct rados_io *rio;
253         if (!rados) {
254                 perror("malloc");
255                 return -1;
256         }
257         //TODO this should be a parameter. maybe -r (from rados)?
258         strncpy(rados->pool, "xseg", MAX_POOL_NAME);
259         if (rados_create(&rados->cluster, NULL) < 0) {
260                 printf("Rados create failed!\n");
261                 return -1;
262         }
263         if (rados_conf_read_file(rados->cluster, NULL) < 0){
264                 printf("Error reading rados conf files!\n");
265                 return -1;
266         }
267         if (rados_connect(rados->cluster) < 0) {
268                 printf("Rados connect failed!\n");
269                 rados_shutdown(rados->cluster);
270                 free(rados);
271                 return 0;
272         }
273         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
274                 printf( "Pool does not exists. I will try to create it\n");
275                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
276                         printf("Couldn't create pool!\n");
277                         rados_shutdown(rados->cluster);
278                         free(rados);
279                         return -1;
280                 }
281                 printf("Pool created.\n");
282         }
283         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0) {
284                 printf("ioctx create problem.\n");
285                 rados_shutdown(rados->cluster);
286                 free(rados);
287                 return -1;
288         }
289         peer->priv = (void *) rados;
290         for (i = 0; i < peer->nr_ops; i++) {
291                 rio = malloc(sizeof(struct rados_io));
292                 if (!rio) {
293                         //ugly
294                         for (j = 0; j < i; j++) {
295                                 free(peer->peer_reqs[j].priv);
296                         }
297                         free(rados);
298                         perror("malloc");
299                         return -1;
300                 }
301                 peer->peer_reqs[i].priv = (void *) rio;
302         }
303         return 0;
304 }
305
306 // nothing to do here for now
307 int custom_arg_parse(int argc, const char *argv[])
308 {
309         return 0;
310 }
311
312 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
313 {
314         struct rados_io *rio = (struct rados_io *) (pr->priv);
315         char *target = xseg_get_target(peer->xseg, pr->req);
316         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : pr->req->targetlen;
317         strncpy(rio->obj_name, target, end);
318         rio->obj_name[end] = 0;
319         //log_pr("dispatch", pr);
320         switch (pr->req->op){
321         case X_READ:
322                 handle_read(peer, pr); break;
323         case X_WRITE: 
324                 handle_write(peer, pr); break;
325         case X_DELETE:
326                 if (canDefer(peer))
327                         defer_request(peer, pr);
328                 else
329                         handle_delete(peer, pr);
330                 break;
331         case X_INFO:
332                 if (canDefer(peer))
333                         defer_request(peer, pr);
334                 else
335                         handle_info(peer, pr);
336                 break;
337         case X_COPY:
338                 if (canDefer(peer))
339                         defer_request(peer, pr);
340                 else
341                         handle_copy(peer, pr);
342                 break;
343         default:
344                 fail(peer, pr);
345         }
346         return 0;
347 }