2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
37 * Giannakos Filippos <philipgian@cslab.ece.ntua.gr>
44 #include <rados/librados.h>
49 #define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
50 #define SOSLOG(level, ...) \
52 if (level <= sos_debug_level) { \
53 fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
57 #define MAX_NAME_LEN 256
68 struct sos_request *req;
69 volatile unsigned long state;
70 char obj_name[MAX_NAME_LEN];
77 volatile unsigned int sos_debug_level=0;
79 void sos_set_debug_level(unsigned int level)
81 sos_debug_level = level;
84 static int handle_io(sos_handle_t sos, struct sos_request *req);
86 sos_handle_t sos_init(sos_cb_t cb)
88 sos_handle_t sos = (sos_handle_t) malloc(sizeof(struct sos_handle));
91 if (rados_create(&sos->cluster, NULL) < 0) {
92 printf("Rados create failed!\n");
95 SOSLOG(1, "Rados create OK \n");
96 if (rados_conf_read_file(sos->cluster, NULL) < 0){
97 SOSLOG(0, "Error reading rados conf files!\n");
100 if (rados_connect(sos->cluster) < 0) {
101 SOSLOG(0, "Rados connect failed!\n");
102 rados_shutdown(sos->cluster);
106 SOSLOG(1, "Rados connect OK \n");
107 if (rados_pool_lookup(sos->cluster, SOS_POOL) < 0) {
108 SOSLOG(0, "Pool does not exists. I will try to create it\n");
109 if (rados_pool_create(sos->cluster, SOS_POOL) < 0){
110 SOSLOG(0, "Couldn't create pool!\n");
111 rados_shutdown(sos->cluster);
115 SOSLOG(1, "Pool created.\n");
117 if (rados_ioctx_create(sos->cluster, SOS_POOL, &(sos->ioctx)) < 0) {
118 SOSLOG(0, "ioctx create problem.\n");
119 rados_shutdown(sos->cluster);
127 void sos_shut(sos_handle_t sos)
129 rados_ioctx_destroy(sos->ioctx);
130 rados_shutdown(sos->cluster);
135 int sos_submit(sos_handle_t sos, struct sos_request *req)
141 r =handle_io(sos, req);
151 int sos_isRead(struct sos_request *req)
153 /* lets define this for now */
154 return (req->op == S_READ);
158 void rados_ack_cb(rados_completion_t c, void *arg)
160 int ret = rados_aio_get_return_value(c);
161 struct rados_arg *rarg = (struct rados_arg *) arg;
162 sos_handle_t sos = rarg->sos;
163 struct sos_request *req = rarg->req;
166 /* calculate time for ack */
169 gettimeofday(&tv, NULL);
170 timersub(&tv, &rarg->start, &tv);
171 us = tv.tv_sec*1000000 +tv.tv_usec;
172 SOSLOG(2, "Request %lu acked after %lu us\n", req->id, us);
174 SOSLOG(2, "Request %lu acked with ret value %d \n", req->id, ret);
176 /* rados writes return 0 upon success or an error code. so fix retval to
177 * represent bytes succesfully written.
179 if (req->op == S_WRITE && ret == 0)
180 req->retval = req->size;
184 sos->cb(req, S_NOTIFY_FAIL);
185 rarg->state = S_FAILED;
188 sos->cb(req, S_NOTIFY_ACK);
189 rarg->state = S_ACKED;
191 /* substitute with rarg->istherecommit ? */
192 if (sos_isRead(req) || !(req->flags & SF_FUA)){
193 /* no commit, so free rarg */
195 rados_aio_release(c);
199 void rados_commit_cb(rados_completion_t c, void *arg)
201 int ret = rados_aio_get_return_value(c);
202 struct rados_arg *rarg = (struct rados_arg *) arg;
203 sos_handle_t sos = rarg->sos;
204 struct sos_request *req = rarg->req;
207 /* calculate time for commit */
210 gettimeofday(&tv, NULL);
211 timersub(&tv, &rarg->start, &tv);
212 us = tv.tv_sec*1000000 +tv.tv_usec;
213 SOSLOG(2, "Request %lu commited after %lu us\n", req->id, us);
215 SOSLOG(2, "Request %lu commited with ret value %d \n", req->id, ret);
217 /* rados writes return 0 upon success or an error code. so fix retval to
218 * represent bytes succesfully written.
220 if (req->op == S_WRITE && ret == 0)
221 req->retval = req->size;
224 if (ret < 0 && !(rarg->state & S_FAILED)) {
225 /* notify failure only once */
226 sos->cb(req, S_NOTIFY_FAIL);
228 /* discard failed commits with failed acks */
229 else if (ret >= 0 ) {
230 sos->cb(req, S_NOTIFY_COMMIT);
233 rados_aio_release(c);
236 static int handle_async_io(sos_handle_t sos, struct sos_request *req){
238 rados_completion_t rados_compl;
239 struct rados_arg *rarg;
240 if (req->targetlen >= MAX_NAME_LEN){
244 rarg = malloc(sizeof(struct rados_arg));
250 rarg->state = S_PENDING;
251 strncpy(rarg->obj_name, req->target, req->targetlen);
252 rarg->obj_name[req->targetlen]=0;
253 SOSLOG(2, "Request %lu assigned to object[%u]: %s \n", req->id, \
254 req->targetlen, rarg->obj_name);
257 /* set time request started */
258 gettimeofday(&rarg->start, NULL);
260 if (!sos_isRead(req) && (req->flags & SF_FUA))
261 r = rados_aio_create_completion(rarg,NULL, rados_commit_cb,
264 r = rados_aio_create_completion(rarg, rados_ack_cb, NULL,
271 if (sos_isRead(req)) {
272 r = rados_aio_read(sos->ioctx, rarg->obj_name, rados_compl,
273 req->data, req->size, req->offset);
276 r = rados_aio_write(sos->ioctx, rarg->obj_name, rados_compl,
277 req->data, req->size, req->offset);
280 rados_aio_release(rados_compl);
287 static int handle_sync_io(sos_handle_t sos, struct sos_request *req){
289 rados_completion_t rados_compl;
290 struct rados_arg *rarg;
291 if (req->targetlen>= MAX_NAME_LEN){
295 rarg = malloc(sizeof(struct rados_arg));
301 rarg->state = S_PENDING;
302 strncpy(rarg->obj_name, req->target, req->targetlen);
303 rarg->obj_name[req->targetlen]=0;
304 SOSLOG(2, "Request %lu assigned to object[%u]: %s \n", req->id, \
305 req->targetlen, rarg->obj_name);
307 /* set time request started */
308 gettimeofday(&rarg->start, NULL);
311 if (sos_isRead(req)) {
312 r = rados_read(sos->ioctx, rarg->obj_name, req->data, req->size,
316 if (req->flags & SF_FUA) {
317 r = rados_aio_create_completion(rarg,NULL, NULL,
321 r = rados_aio_write(sos->ioctx, rarg->obj_name, rados_compl,
322 req->data, req->size, req->offset);
324 rados_aio_release(rados_compl);
327 r = rados_aio_wait_for_safe(rados_compl);
328 rados_aio_release(rados_compl);
329 /* there is no sync FUA. rados should be patched to support
332 r = rados_safe_write(sos->ioctx, rarg->obj_name, req->data,
333 req->size, req->offset);
337 r = rados_write(sos->ioctx, rarg->obj_name, req->data,
338 req->size, req->offset);
339 /* TODO does return value need to be fixed as in aio write ? */
348 static int handle_io(sos_handle_t sos, struct sos_request *req)
351 if (!req->size && req->flags & SF_FLUSH){
352 r = rados_aio_flush(sos->ioctx);
356 if (req->flags & SF_SYNC)
357 r = handle_sync_io(sos,req);
359 r = handle_async_io(sos,req);