Bump version to 0.3.5next
[archipelago] / xseg / util_libs / user / sos / sos.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 /* sos.c
36  *
37  * Giannakos Filippos <philipgian@cslab.ece.ntua.gr>
38  *
39  */
40
41 #include "sos.h"
42 #include <stdlib.h>
43 #include <stdio.h>
44 #include <rados/librados.h>
45 #ifdef __SOS_TIME
46 #include <sys/time.h>
47 #endif
48
49 #define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
50 #define SOSLOG(level, ...)                                              \
51         do {                                                                    \
52                 if (level <=  sos_debug_level) {                                \
53                         fprintf(stderr, "%s: "  REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
54                 }                                                               \
55         }while (0)
56
57 #define MAX_NAME_LEN 256
58
59 struct sos_handle {
60         rados_t cluster;
61         rados_ioctx_t ioctx;
62         sos_cb_t cb;
63         char *sos_pool;
64 };
65
66 struct rados_arg {
67         sos_handle_t sos;
68         struct sos_request *req;
69         volatile unsigned long state;
70         char obj_name[MAX_NAME_LEN];
71 #ifdef __SOS_TIME
72         struct timeval start;
73 #endif
74 };
75
76 /* sos debug level */
77 volatile unsigned int sos_debug_level=0;
78
79 void sos_set_debug_level(unsigned int level)
80 {
81         sos_debug_level = level;
82 }
83
84 static int handle_io(sos_handle_t sos, struct sos_request *req);
85
86 sos_handle_t sos_init(sos_cb_t cb)
87 {
88         sos_handle_t sos = (sos_handle_t) malloc(sizeof(struct sos_handle));
89         sos->cb = cb;
90
91         if (rados_create(&sos->cluster, NULL) < 0) {
92                 printf("Rados create failed!\n");
93                 return NULL;
94         }
95         SOSLOG(1, "Rados create OK \n");
96         if (rados_conf_read_file(sos->cluster, NULL) < 0){
97                 SOSLOG(0, "Error reading rados conf files!\n");
98                 return NULL;
99         }
100         if (rados_connect(sos->cluster) < 0) {
101                 SOSLOG(0, "Rados connect failed!\n");
102                 rados_shutdown(sos->cluster);
103                 free(sos);
104                 return NULL;
105         }
106         SOSLOG(1, "Rados connect OK \n");
107         if (rados_pool_lookup(sos->cluster, SOS_POOL) < 0) {
108                 SOSLOG(0, "Pool does not exists. I will try to create it\n");
109                 if (rados_pool_create(sos->cluster, SOS_POOL) < 0){
110                         SOSLOG(0, "Couldn't create pool!\n");
111                         rados_shutdown(sos->cluster);
112                         free(sos);
113                 return NULL;
114                 }
115                 SOSLOG(1, "Pool created.\n");
116         }
117         if (rados_ioctx_create(sos->cluster, SOS_POOL, &(sos->ioctx)) < 0) {
118                 SOSLOG(0, "ioctx create problem.\n");
119                 rados_shutdown(sos->cluster);
120                 free(sos);
121                 return NULL;
122         }
123
124         return sos;
125 }
126
127 void sos_shut(sos_handle_t sos)
128 {
129         rados_ioctx_destroy(sos->ioctx);
130         rados_shutdown(sos->cluster);
131         free(sos);
132         return;
133 }
134
135 int sos_submit(sos_handle_t sos, struct sos_request *req)
136 {
137         int r;
138         switch (req->op){
139         case S_READ:
140         case S_WRITE:
141                 r =handle_io(sos, req);
142                 break;
143         case S_NONE:
144         default:
145                 r = -1;
146         }
147         return r;
148 }
149
150
151 int sos_isRead(struct sos_request *req)
152 {
153         /* lets define this for now */
154         return (req->op == S_READ);
155 }
156
157
158 void rados_ack_cb(rados_completion_t c, void *arg)
159 {
160         int ret = rados_aio_get_return_value(c);
161         struct rados_arg *rarg = (struct rados_arg *) arg;
162         sos_handle_t sos = rarg->sos;
163         struct sos_request *req = rarg->req;
164
165 #ifdef __SOS_TIME
166         /* calculate time for ack */
167         struct timeval tv;
168         unsigned long us;
169         gettimeofday(&tv, NULL);
170         timersub(&tv, &rarg->start, &tv);
171         us = tv.tv_sec*1000000 +tv.tv_usec;
172         SOSLOG(2, "Request %lu acked after %lu us\n", req->id, us);
173 #endif
174         SOSLOG(2, "Request %lu acked with ret value %d \n", req->id, ret);
175
176         /* rados writes return 0 upon success or an error code. so fix retval to
177          * represent bytes succesfully written.
178          */
179         if (req->op == S_WRITE && ret == 0)
180                 req->retval = req->size;
181         else    
182                 req->retval = ret;
183         if (ret < 0) {
184                 sos->cb(req, S_NOTIFY_FAIL);
185                 rarg->state = S_FAILED;
186         }
187         else {
188                 sos->cb(req, S_NOTIFY_ACK);
189                 rarg->state = S_ACKED;
190         }
191         /* substitute with rarg->istherecommit ? */
192         if (sos_isRead(req) || !(req->flags & SF_FUA)){
193                 /* no commit, so free rarg */
194                 free(rarg);
195                 rados_aio_release(c);
196         }
197 }
198
199 void rados_commit_cb(rados_completion_t c, void *arg)
200 {
201         int ret = rados_aio_get_return_value(c);
202         struct rados_arg *rarg = (struct rados_arg *) arg;
203         sos_handle_t sos = rarg->sos;
204         struct sos_request *req = rarg->req;
205
206 #ifdef __SOS_TIME       
207         /* calculate time for commit */
208         struct timeval tv;
209         unsigned long us;
210         gettimeofday(&tv, NULL);
211         timersub(&tv, &rarg->start, &tv);
212         us = tv.tv_sec*1000000 +tv.tv_usec;
213         SOSLOG(2, "Request %lu commited after %lu us\n", req->id, us);
214 #endif
215         SOSLOG(2, "Request %lu commited with ret value %d \n", req->id, ret);
216         
217         /* rados writes return 0 upon success or an error code. so fix retval to
218          * represent bytes succesfully written.
219          */
220         if (req->op == S_WRITE && ret == 0)
221                 req->retval = req->size;
222         else    
223                 req->retval = ret;
224         if (ret < 0 && !(rarg->state & S_FAILED)) {
225                 /* notify failure only once */
226                 sos->cb(req, S_NOTIFY_FAIL);
227         }
228         /* discard failed commits with failed acks */
229         else if (ret >= 0 ) {
230                 sos->cb(req, S_NOTIFY_COMMIT);
231         }
232         free(rarg); 
233         rados_aio_release(c);
234 }
235
236 static int handle_async_io(sos_handle_t sos, struct sos_request *req){
237         int r;
238         rados_completion_t rados_compl;
239         struct rados_arg *rarg;
240         if (req->targetlen >= MAX_NAME_LEN){
241                 req->retval = -1;
242                 return -1;
243         }
244         rarg = malloc(sizeof(struct rados_arg));
245         if (!rarg){
246                 return -1;
247         }
248         rarg->sos = sos;
249         rarg->req = req;
250         rarg->state = S_PENDING;
251         strncpy(rarg->obj_name, req->target, req->targetlen);
252         rarg->obj_name[req->targetlen]=0;
253         SOSLOG(2, "Request %lu assigned to object[%u]: %s  \n", req->id, \
254                         req->targetlen, rarg->obj_name);
255         
256 #ifdef __SOS_TIME       
257         /* set time request started */
258         gettimeofday(&rarg->start, NULL);
259 #endif
260         if (!sos_isRead(req) && (req->flags & SF_FUA))
261                 r = rados_aio_create_completion(rarg,NULL, rados_commit_cb,
262                                 &rados_compl);
263         else    
264                 r = rados_aio_create_completion(rarg, rados_ack_cb, NULL, 
265                                 &rados_compl);
266         if (r < 0) {
267                 free (rarg);
268                 return r;
269         }
270
271         if (sos_isRead(req)) {
272                 r = rados_aio_read(sos->ioctx, rarg->obj_name, rados_compl,
273                                 req->data, req->size, req->offset);
274         }
275         else {
276                 r = rados_aio_write(sos->ioctx, rarg->obj_name, rados_compl, 
277                                 req->data, req->size, req->offset);
278         }
279         if (r < 0){ 
280                 rados_aio_release(rados_compl);
281                 free (rarg);
282         }
283         
284         return r;
285 }
286
287 static int handle_sync_io(sos_handle_t sos, struct sos_request *req){
288         int r;
289         rados_completion_t rados_compl;
290         struct rados_arg *rarg;
291         if (req->targetlen>= MAX_NAME_LEN){
292                 req->retval = -1;
293                 return -1;
294         }
295         rarg = malloc(sizeof(struct rados_arg));
296         if (!rarg){
297                 return -1;
298         }
299         rarg->sos = sos;
300         rarg->req = req;
301         rarg->state = S_PENDING;
302         strncpy(rarg->obj_name, req->target, req->targetlen);
303         rarg->obj_name[req->targetlen]=0;
304         SOSLOG(2, "Request %lu assigned to object[%u]: %s  \n", req->id, \
305                         req->targetlen, rarg->obj_name);
306 #ifdef __SOS_TIME       
307         /* set time request started */
308         gettimeofday(&rarg->start, NULL);
309 #endif
310         
311         if (sos_isRead(req)) {
312                 r = rados_read(sos->ioctx, rarg->obj_name, req->data, req->size,
313                                 req->offset);
314         }
315         else {
316                 if (req->flags & SF_FUA) {
317                         r = rados_aio_create_completion(rarg,NULL, NULL,
318                                        &rados_compl);
319                         if (r < 0)
320                                 goto syncio_exit;
321                         r = rados_aio_write(sos->ioctx, rarg->obj_name, rados_compl,
322                                         req->data, req->size, req->offset);
323                         if (r < 0){
324                                 rados_aio_release(rados_compl);
325                                 goto syncio_exit;
326                         }
327                         r = rados_aio_wait_for_safe(rados_compl);
328                         rados_aio_release(rados_compl);
329                         /* there is no sync FUA. rados should be patched to support 
330                          * sync safe writes.
331
332                         r = rados_safe_write(sos->ioctx, rarg->obj_name, req->data, 
333                                         req->size, req->offset);
334                          */
335                 }
336                 else {
337                         r = rados_write(sos->ioctx, rarg->obj_name, req->data, 
338                                         req->size, req->offset);
339                         /* TODO does return value need to be fixed as in aio write ? */
340                 }
341         }
342 syncio_exit:
343         free(rarg);
344         req->retval = r;
345         return r;
346 }
347
348 static int handle_io(sos_handle_t sos, struct sos_request *req)
349 {
350         int r;
351         if (!req->size && req->flags & SF_FLUSH){
352                 r = rados_aio_flush(sos->ioctx);
353                 req->retval = r;
354                 return r;
355         }
356         if (req->flags & SF_SYNC)
357                 r = handle_sync_io(sos,req);
358         else 
359                 r = handle_async_io(sos,req);
360
361         return r;
362 }