fix pfiled malloc issue
[archipelago] / xseg / peers / user / mt-sosd.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <xseg/xseg.h>
5 #include <mpeer.h>
6 #include <rados/librados.h>
7
8 #define MAX_POOL_NAME 64
9 #define MAX_OBJ_NAME 256
10
11 struct radosd {
12         rados_t cluster;
13         rados_ioctx_t ioctx;
14         char pool[MAX_POOL_NAME];
15 };
16
17 struct rados_io{
18         char obj_name[MAX_OBJ_NAME];
19 };
20
21 void rados_ack_cb(rados_completion_t c, void *arg)
22 {
23         struct peer_req *pr = (struct peer_req*) arg;
24         struct peerd *peer = pr->peer;
25         int ret = rados_aio_get_return_value(c);
26         pr->retval = ret;
27         rados_aio_release(c);
28         dispatch(peer, pr, pr->req);
29 }
30
31 void rados_commit_cb(rados_completion_t c, void *arg)
32 {
33         struct peer_req *pr = (struct peer_req*) arg;
34         struct peerd *peer = pr->peer;
35         int ret = rados_aio_get_return_value(c);
36         pr->retval = ret;
37         rados_aio_release(c);
38         dispatch(peer, pr, pr->req);
39 }
40
41 int do_aio_read(struct peerd *peer, struct peer_req *pr)
42 {
43         struct radosd *rados = (struct radosd *) peer->priv;
44         struct xseg_request *req = pr->req;
45         struct rados_io *rio = (struct rados_io *) pr->priv;
46         char *data = xseg_get_data(peer->xseg, pr->req);
47         int r;
48
49         rados_completion_t rados_compl;
50         r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
51         if (r < 0) 
52                 return -1;
53         r = rados_aio_read(rados->ioctx, rio->obj_name, rados_compl, 
54                         data + req->serviced,
55                         req->size - req->serviced,
56                         req->offset + req->serviced);
57         if (r < 0) {
58                 rados_aio_release(rados_compl);
59                 return -1;
60         }
61         return 0;
62 }
63
64 int do_aio_write(struct peerd *peer, struct peer_req *pr)
65 {
66         struct radosd *rados = (struct radosd *) peer->priv;
67         struct xseg_request *req = pr->req;
68         struct rados_io *rio = (struct rados_io *) pr->priv;
69         char *data = xseg_get_data(peer->xseg, pr->req);
70         int r;
71
72         rados_completion_t rados_compl;
73         r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
74         if (r < 0) 
75                 return -1;
76         r = rados_aio_write(rados->ioctx, rio->obj_name, rados_compl, 
77                         data + req->serviced,
78                         req->size - req->serviced,
79                         req->offset + req->serviced);
80         if (r < 0) {
81                 rados_aio_release(rados_compl);
82                 return -1;
83         }
84         return 0;
85 }
86
87 int handle_delete(struct peerd *peer, struct peer_req *pr)
88 {
89         int r;
90         struct radosd *rados = (struct radosd *) peer->priv;
91         struct rados_io *rio = (struct rados_io *) pr->priv;
92         
93         //log_pr("delete start", pr);
94         r = rados_remove(rados->ioctx, rio->obj_name);
95         if (r < 0) {
96                 pr->retval = r;
97                 fail(peer, pr);
98         }
99         else {
100                 pr->retval = 0;
101                 complete(peer, pr);
102         }
103         return 0;
104 }
105
106 int handle_info(struct peerd *peer, struct peer_req *pr)
107 {
108         uint64_t size;
109         time_t pmtime;
110         int r;
111         struct xseg_request *req = pr->req;
112         struct radosd *rados = (struct radosd *) peer->priv;
113         struct rados_io *rio = (struct rados_io *) pr->priv;
114         char *req_data = xseg_get_data(peer->xseg, req);
115
116         log_pr("info start", pr);
117         
118         r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
119         if (r < 0) {
120                 pr->retval = r;
121                 fail(peer, pr);
122         }
123         else {
124                 *((uint64_t *) req_data) = size;
125                 pr->retval = sizeof(uint64_t);
126                 complete(peer,pr);
127         }
128         return 0;
129 }
130
131 int handle_read(struct peerd *peer, struct peer_req *pr)
132 {
133         struct xseg_request *req = pr->req;
134         char *data;
135         if (req->state == XS_ACCEPTED) {
136                 if (!req->size) {
137                         complete(peer, pr);
138                         return 0;
139                 }
140                 //should we ensure req->op = X_READ ?
141                 pending(peer, pr);
142                 log_pr("read", pr);
143                 if (do_aio_read(peer, pr) < 0) {
144                         fail(peer, pr);
145                 }
146         }
147         else if (req->state == XS_PENDING) {
148                 data = xseg_get_data(peer->xseg, pr->req);
149                 if (pr->retval > 0) 
150                         req->serviced += pr->retval;
151                 else if (pr->retval == 0) {
152                         /* reached end of object. zero out rest of data
153                          * requested from this object
154                          */
155                         memset(data, 0, req->datalen - req->serviced);
156                         req->serviced = req->datalen ;
157                 }
158                 else if (pr->retval == -2) {
159                         /* object not found. return zeros instead */
160                         memset(data, 0, req->datalen);
161                         req->serviced = req->datalen;
162                 }
163                 else {
164                         /* pr->retval < 0 && pr->retval != -2 */
165                         fail(peer, pr);
166                         return 0;
167                 }
168                 if (req->serviced >= req->datalen) {
169                         log_pr("read complete", pr);
170                         complete(peer, pr);
171                         return 0;
172                 }
173
174                 if (!req->size) {
175                         /* should not happen */
176                         fail(peer, pr);
177                         return 0;
178                 }
179                 //TODO assert req->op == X_READ
180                 
181                 /* resubmit */
182                 log_pr("read resubmit", pr);
183                 if (do_aio_read(peer, pr) < 0) {
184                         fail(peer, pr);
185                 }
186         }
187         else {
188                 /* should not reach this */
189                 printf("read request reached this\n");
190                 fail(peer, pr);
191         }
192         return 0;
193 }
194
195 int handle_write(struct peerd *peer, struct peer_req *pr)
196 {
197         struct xseg_request *req = pr->req;
198         if (req->state == XS_ACCEPTED) {
199                 if (!req->size) {
200                         // for future use
201                         if (req->flags & XF_FLUSH) {
202                                 complete(peer, pr);
203                                 return 0;
204                         }
205                         else {
206                                 complete(peer, pr);
207                                 return 0;
208                         }
209                 }
210                 //should we ensure req->op = X_READ ?
211                 pending(peer, pr);
212                 //log_pr("write", pr);
213                 if (do_aio_write(peer, pr) < 0) {
214                         fail(peer, pr);
215                 }
216         }
217         else if (req->state == XS_PENDING) {
218                 /* rados writes return 0 if write succeeded or < 0 if failed
219                  * no resubmission occurs
220                  */
221                 //log_pr("write complete", pr);
222                 if (pr->retval == 0) {
223                         req->serviced = req->datalen;
224                         complete(peer, pr);
225                         return 0;
226                 }
227                 else {
228                         fail(peer, pr);
229                         return 0;
230                 }
231         }
232         else {
233                 /* should not reach this */
234                 printf("write request reached this\n");
235                 fail(peer, pr);
236         }
237         return 0;
238 }
239
240
241 int custom_peer_init(struct peerd *peer, int argc, const char *argv[])
242 {
243         int i, j;
244         struct radosd *rados = malloc(sizeof(struct radosd));
245         struct rados_io *rio;
246         if (!rados) {
247                 perror("malloc");
248                 return -1;
249         }
250         //TODO this should be a parameter. maybe -r (from rados)?
251         strncpy(rados->pool, "xseg", MAX_POOL_NAME);
252         if (rados_create(&rados->cluster, NULL) < 0) {
253                 printf("Rados create failed!\n");
254                 return -1;
255         }
256         if (rados_conf_read_file(rados->cluster, NULL) < 0){
257                 printf("Error reading rados conf files!\n");
258                 return -1;
259         }
260         if (rados_connect(rados->cluster) < 0) {
261                 printf("Rados connect failed!\n");
262                 rados_shutdown(rados->cluster);
263                 free(rados);
264                 return 0;
265         }
266         if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
267                 printf( "Pool does not exists. I will try to create it\n");
268                 if (rados_pool_create(rados->cluster, rados->pool) < 0){
269                         printf("Couldn't create pool!\n");
270                         rados_shutdown(rados->cluster);
271                         free(rados);
272                         return -1;
273                 }
274                 printf("Pool created.\n");
275         }
276         if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0) {
277                 printf("ioctx create problem.\n");
278                 rados_shutdown(rados->cluster);
279                 free(rados);
280                 return -1;
281         }
282         peer->priv = (void *) rados;
283         for (i = 0; i < peer->nr_ops; i++) {
284                 rio = malloc(sizeof(struct rados_io));
285                 if (!rio) {
286                         //ugly
287                         for (j = 0; j < i; j++) {
288                                 free(peer->peer_reqs[j].priv);
289                         }
290                         free(rados);
291                         perror("malloc");
292                         return -1;
293                 }
294                 peer->peer_reqs[i].priv = (void *) rio;
295         }
296         return 0;
297 }
298
299 // nothing to do here for now
300 int custom_arg_parse(int argc, const char *argv[])
301 {
302         return 0;
303 }
304
305 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
306 {
307         struct rados_io *rio = (struct rados_io *) (pr->priv);
308         char *target = xseg_get_target(peer->xseg, pr->req);
309         unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : pr->req->targetlen;
310         strncpy(rio->obj_name, target, end);
311         rio->obj_name[end] = 0;
312         //log_pr("dispatch", pr);
313         switch (pr->req->op){
314         case X_READ:
315                 handle_read(peer, pr); break;
316         case X_WRITE: 
317                 handle_write(peer, pr); break;
318         case X_DELETE:
319                 if (canDefer(peer))
320                         defer_request(peer, pr);
321                 else
322                         handle_delete(peer, pr);
323                 break;
324         case X_INFO:
325                 if (canDefer(peer))
326                         defer_request(peer, pr);
327                 else
328                         handle_info(peer, pr);
329                 break;
330         default:
331                 fail(peer, pr);
332         }
333         return 0;
334 }