6 #include <rados/librados.h>
8 #define MAX_POOL_NAME 64
9 #define MAX_OBJ_NAME 256
14 char pool[MAX_POOL_NAME];
18 char obj_name[MAX_OBJ_NAME];
21 void rados_ack_cb(rados_completion_t c, void *arg)
23 struct peer_req *pr = (struct peer_req*) arg;
24 struct peerd *peer = pr->peer;
25 int ret = rados_aio_get_return_value(c);
28 dispatch(peer, pr, pr->req);
31 void rados_commit_cb(rados_completion_t c, void *arg)
33 struct peer_req *pr = (struct peer_req*) arg;
34 struct peerd *peer = pr->peer;
35 int ret = rados_aio_get_return_value(c);
38 dispatch(peer, pr, pr->req);
41 int do_aio_read(struct peerd *peer, struct peer_req *pr)
43 struct radosd *rados = (struct radosd *) peer->priv;
44 struct xseg_request *req = pr->req;
45 struct rados_io *rio = (struct rados_io *) pr->priv;
46 char *data = xseg_get_data(peer->xseg, pr->req);
49 rados_completion_t rados_compl;
50 r = rados_aio_create_completion(pr, rados_ack_cb, NULL, &rados_compl);
53 r = rados_aio_read(rados->ioctx, rio->obj_name, rados_compl,
55 req->size - req->serviced,
56 req->offset + req->serviced);
58 rados_aio_release(rados_compl);
64 int do_aio_write(struct peerd *peer, struct peer_req *pr)
66 struct radosd *rados = (struct radosd *) peer->priv;
67 struct xseg_request *req = pr->req;
68 struct rados_io *rio = (struct rados_io *) pr->priv;
69 char *data = xseg_get_data(peer->xseg, pr->req);
72 rados_completion_t rados_compl;
73 r = rados_aio_create_completion(pr, NULL, rados_commit_cb, &rados_compl);
76 r = rados_aio_write(rados->ioctx, rio->obj_name, rados_compl,
78 req->size - req->serviced,
79 req->offset + req->serviced);
81 rados_aio_release(rados_compl);
87 int handle_delete(struct peerd *peer, struct peer_req *pr)
90 struct radosd *rados = (struct radosd *) peer->priv;
91 struct rados_io *rio = (struct rados_io *) pr->priv;
93 //log_pr("delete start", pr);
94 r = rados_remove(rados->ioctx, rio->obj_name);
106 int handle_info(struct peerd *peer, struct peer_req *pr)
111 struct xseg_request *req = pr->req;
112 struct radosd *rados = (struct radosd *) peer->priv;
113 struct rados_io *rio = (struct rados_io *) pr->priv;
114 char *req_data = xseg_get_data(peer->xseg, req);
116 log_pr("info start", pr);
118 r = rados_stat(rados->ioctx, rio->obj_name, &size, &pmtime);
124 *((uint64_t *) req_data) = size;
125 pr->retval = sizeof(uint64_t);
131 int handle_read(struct peerd *peer, struct peer_req *pr)
133 struct xseg_request *req = pr->req;
135 if (req->state == XS_ACCEPTED) {
140 //should we ensure req->op = X_READ ?
143 if (do_aio_read(peer, pr) < 0) {
147 else if (req->state == XS_PENDING) {
148 data = xseg_get_data(peer->xseg, pr->req);
150 req->serviced += pr->retval;
151 else if (pr->retval == 0) {
152 /* reached end of object. zero out rest of data
153 * requested from this object
155 memset(data, 0, req->datalen - req->serviced);
156 req->serviced = req->datalen ;
158 else if (pr->retval == -2) {
159 /* object not found. return zeros instead */
160 memset(data, 0, req->datalen);
161 req->serviced = req->datalen;
164 /* pr->retval < 0 && pr->retval != -2 */
168 if (req->serviced >= req->datalen) {
169 log_pr("read complete", pr);
175 /* should not happen */
179 //TODO assert req->op == X_READ
182 log_pr("read resubmit", pr);
183 if (do_aio_read(peer, pr) < 0) {
188 /* should not reach this */
189 printf("read request reached this\n");
195 int handle_write(struct peerd *peer, struct peer_req *pr)
197 struct xseg_request *req = pr->req;
198 if (req->state == XS_ACCEPTED) {
201 if (req->flags & XF_FLUSH) {
210 //should we ensure req->op = X_READ ?
212 //log_pr("write", pr);
213 if (do_aio_write(peer, pr) < 0) {
217 else if (req->state == XS_PENDING) {
218 /* rados writes return 0 if write succeeded or < 0 if failed
219 * no resubmission occurs
221 //log_pr("write complete", pr);
222 if (pr->retval == 0) {
223 req->serviced = req->datalen;
233 /* should not reach this */
234 printf("write request reached this\n");
241 int custom_peer_init(struct peerd *peer, int argc, const char *argv[])
244 struct radosd *rados = malloc(sizeof(struct radosd));
245 struct rados_io *rio;
250 //TODO this should be a parameter. maybe -r (from rados)?
251 strncpy(rados->pool, "xseg", MAX_POOL_NAME);
252 if (rados_create(&rados->cluster, NULL) < 0) {
253 printf("Rados create failed!\n");
256 if (rados_conf_read_file(rados->cluster, NULL) < 0){
257 printf("Error reading rados conf files!\n");
260 if (rados_connect(rados->cluster) < 0) {
261 printf("Rados connect failed!\n");
262 rados_shutdown(rados->cluster);
266 if (rados_pool_lookup(rados->cluster, rados->pool) < 0) {
267 printf( "Pool does not exists. I will try to create it\n");
268 if (rados_pool_create(rados->cluster, rados->pool) < 0){
269 printf("Couldn't create pool!\n");
270 rados_shutdown(rados->cluster);
274 printf("Pool created.\n");
276 if (rados_ioctx_create(rados->cluster, rados->pool, &(rados->ioctx)) < 0) {
277 printf("ioctx create problem.\n");
278 rados_shutdown(rados->cluster);
282 peer->priv = (void *) rados;
283 for (i = 0; i < peer->nr_ops; i++) {
284 rio = malloc(sizeof(struct rados_io));
287 for (j = 0; j < i; j++) {
288 free(peer->peer_reqs[j].priv);
294 peer->peer_reqs[i].priv = (void *) rio;
299 // nothing to do here for now
300 int custom_arg_parse(int argc, const char *argv[])
305 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
307 struct rados_io *rio = (struct rados_io *) (pr->priv);
308 char *target = xseg_get_target(peer->xseg, pr->req);
309 unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME -1 )? MAX_OBJ_NAME - 1 : pr->req->targetlen;
310 strncpy(rio->obj_name, target, end);
311 rio->obj_name[end] = 0;
312 //log_pr("dispatch", pr);
313 switch (pr->req->op){
315 handle_read(peer, pr); break;
317 handle_write(peer, pr); break;
320 defer_request(peer, pr);
322 handle_delete(peer, pr);
326 defer_request(peer, pr);
328 handle_info(peer, pr);