2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <sys/types.h>
45 #include <xseg/xseg.h>
46 #include <util_libs/user/sos/sos.h>
50 #include <sys/syscall.h>
52 /* maybe add this to store struct */
53 #define objsize (4*1024*1024)
54 #define MAX_VOL_NAME 242
56 static int usage(void)
58 printf("Usage: ./sosd <path_to_disk_image> [options]\n"
59 "Options: [-p portno]\n"
60 " [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
61 " [-n nr_parallel_ops]\n");
66 struct xseg_request *req;
68 struct sos_request sos_req;
69 char objname[MAX_VOL_NAME +1 + 12 + 1];
75 struct xseg_port *xport;
82 struct xq resubmit_ops;
90 static unsigned int verbose;
92 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
97 static void signal_self(struct store *store)
99 union sigval sigval = {0};
100 pid_t me = store->pid;
101 if (sigqueue(me, SIGIO, sigval) < 0)
105 static int wait_signal(struct store *store)
110 uint32_t usec_timeout = 5000;
112 ts.tv_sec = usec_timeout / 1000000;
113 ts.tv_nsec = 1000 * (usec_timeout - ts.tv_sec * 1000000);
115 r = sigtimedwait(&store->signal_set, &siginfo, &ts);
119 return siginfo.si_signo;
123 static struct io *alloc_io(struct store *store)
125 xqindex idx = xq_pop_head(&store->free_ops, 1);
128 return store->ios + idx;
131 static inline void free_io(struct store *store, struct io *io)
133 xqindex idx = io - store->ios;
135 xq_append_head(&store->free_ops, idx, 1);
136 /* not the right place. sosd_loop couldn't sleep because of that
137 * needed for flush support. maybe this should go to complete function
143 static void resubmit_io(struct store *store, struct io *io)
145 xqindex idx = io - store->ios;
146 xq_append_tail(&store->resubmit_ops, idx, 1);
149 static struct io* get_resubmitted_io(struct store *store)
151 xqindex idx = xq_pop_head(&store->resubmit_ops, 1);
154 return store->ios + idx;
157 static void log_io(char *msg, struct io *io)
159 char target[64], data[64];
160 /* null terminate name in case of req->target is less than 63 characters,
161 * and next character after name (aka first byte of next buffer) is not
164 unsigned int end = (io->req->targetlen> 63) ? 63 : io->req->targetlen;
166 strncpy(target, io->req->target, end);
168 strncpy(data, io->req->data, 63);
170 printf("%s: sos req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
171 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
173 (unsigned int)io->sos_req.id,
174 (unsigned int)io->req->op,
175 (unsigned long long)io->sos_req.offset,
176 (unsigned long)io->sos_req.size,
177 (unsigned long)io->req->serviced,
178 (unsigned long)io->retval,
179 (unsigned int)io->req->state,
180 (unsigned int)io->req->targetlen, target,
181 (unsigned long long)io->req->datalen, data);
185 static void complete(struct store *store, struct io *io)
187 struct xseg_request *req = io->req;
191 gettimeofday(&end, NULL);
192 timersub(&end, &io->start, &end);
193 us = end.tv_sec*1000000 +end.tv_usec;
194 printf("sosd: Request %lu completed after %lu us\n", io->sos_req.id, us);
197 req->state |= XS_SERVED;
198 log_io("complete", io);
199 xseg_respond(store->xseg, req->portno, req);
200 xseg_signal(store->xseg, req->portno);
204 static void fail(struct store *store, struct io *io)
206 struct xseg_request *req = io->req;
207 req->state |= XS_FAILED;
209 xseg_respond(store->xseg, req->portno, req);
210 xseg_signal(store->xseg, req->portno);
214 static void pending(struct store *store, struct io *io)
216 io->req->state = XS_PENDING;
219 static void handle_unknown(struct store *store, struct io *io)
221 struct xseg_request *req = io->req;
222 snprintf(req->data, req->datalen, "unknown request op");
226 static int32_t get_sos_op(uint32_t xseg_op)
238 static uint32_t get_sos_flags(uint32_t xseg_flags)
241 if (xseg_flags & XF_FLUSH) {
244 if (xseg_flags & XF_FUA) {
250 static int calculate_sosreq(struct xseg_request *xseg_req, struct sos_request *sos_req)
256 /* get object name from offset in volume */
257 buf = sos_req->target;
258 suffix = (unsigned int) ((xseg_req->offset+xseg_req->serviced) / (uint64_t)objsize) ;
259 // printf("suffix: %u\n", suffix);
260 if (xseg_req->targetlen> MAX_VOL_NAME){
261 printf("xseg_req targetlen > MAX_VOL_NAME\n");
264 strncpy(buf, xseg_req->target, xseg_req->targetlen);
265 buf[xseg_req->targetlen] = '_';
266 r = snprintf(buf+xseg_req->targetlen+1, 13, "%012u", suffix);
270 //sos_req->target = buf;
271 sos_req->targetlen = xseg_req->targetlen+1+12;
273 /* offset should be set to offset in object */
274 sos_req->offset = (xseg_req->offset + xseg_req->serviced) % objsize;
275 /* sos_req offset + sos_req size < objsize always
276 * request data up to the end of object.
278 sos_req->size = (xseg_req->datalen - xseg_req->serviced) ; /* should this be xseg_req->size ? */
279 if (sos_req->size > objsize - sos_req->offset)
280 sos_req->size = objsize - sos_req->offset;
281 /* this should have been checked before this call */
282 if (xseg_req->serviced < xseg_req->datalen)
283 sos_req->data = xseg_req->data + xseg_req->serviced;
286 // printf("name: %s, size: %lu, offset: %lu, data:%s\n", sos_req->target,
287 // sos_req->size, sos_req->offset, sos_req->data);
291 static void prepare_sosreq(struct store *store, struct io *io)
293 struct xseg_request *xseg_req = io->req;
294 struct sos_request *sos_req = &io->sos_req;
295 sos_req->flags = get_sos_flags(xseg_req->flags);
296 sos_req->state = S_PENDING;
298 sos_req->op = get_sos_op(xseg_req->op);
299 sos_req->priv = store;
300 sos_req->target = io->objname;
303 static inline void prepare_io(struct store *store, struct io *io)
305 prepare_sosreq(store, io);
306 /* Assign io id to sos_req id. This can be done as an initialization of
307 * ios, to avoid reseting it every time */
308 io->sos_req.id = io - store->ios;
312 static void handle_resubmit(struct store *store, struct io *io);
314 static void complete_rw(struct store *store, struct io *io)
317 struct xseg_request *req = io->req;
318 struct sos_request *sos_req = &io->sos_req;
319 if (req->state == XS_ACCEPTED) {
320 /* should not happen */
325 req->serviced += io->retval;
326 else if (io->retval == 0) {
327 /* reached end of object. zero out rest of data
328 * requested from this object
330 memset(sos_req->data, 0, sos_req->size);
331 req->serviced += sos_req->size;
333 else if (io->retval == -2) {
334 /* object not found. return zeros instead */
335 memset(sos_req->data, 0, sos_req->size);
336 req->serviced += sos_req->size;
343 /* request completed ? */
344 if (req->serviced >= req->datalen) {
350 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
352 /* should not happen */
360 log_io("resubmitting", io);
361 resubmit_io(store, io);
365 snprintf(req->data, req->datalen,
366 "wtf, corrupt op %u?\n", req->op);
372 static void handle_read_write(struct store *store, struct io *io)
375 struct xseg_request *req = io->req;
376 struct sos_request *sos_req = &io->sos_req;
377 struct io *resubmit_io;
380 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
382 prepare_io(store, io);
384 if (req->flags & XF_FLUSH) {
386 /* note that with FLUSH/size == 0
387 * there will probably be a (uint64_t)-1 offset */
389 /* size must be zero */
391 /* all these should be irrelevant on a flush request */
393 sos_req->targetlen= 0;
397 * make sure all pending requests are completed and then
398 * perform flush request to flush them to disk.
400 while (xq_size(&store->free_ops) != store->nr_ops){
402 /* handle any possible resubmissions */
403 resubmit_io = get_resubmitted_io(store);
405 handle_resubmit(store, resubmit_io);
406 resubmit_io = get_resubmitted_io(store);
409 r = sos_submit(store->sos, sos_req);
426 r = calculate_sosreq(req, sos_req);
435 //log_io("submit", io);
437 r = sos_submit(store->sos, sos_req);
440 snprintf(req->data, req->datalen,
441 "wtf, corrupt op %u?\n", req->op);
447 strerror_r(errno, req->data, req->datalen);
453 static void handle_returned(struct store *store, struct io *io)
455 io->retval = io->sos_req.retval;
456 switch (io->req->op){
459 complete_rw(store, io);
462 if (io->sos_req.state & S_FAILED)
469 /* this is safe for now, as long as callback is only called once.
470 * if callback gets called, then sos_request has been completed and no race
473 static int sos_cb(struct sos_request *sos_req, unsigned long event)
475 struct store *store = (struct store *) sos_req->priv;
476 struct io *io = (struct io*) store->ios + sos_req->id;
478 if (event == S_NOTIFY_FAIL){
479 sos_req->state = S_FAILED;
481 else if (event == S_NOTIFY_ACK) {
482 sos_req->state = S_ACKED;
484 else if (event == S_NOTIFY_COMMIT){
485 sos_req->state = S_COMMITED;
487 handle_returned(store, io);
491 static void handle_info(struct store *store, struct io *io)
493 struct xseg_request *req = io->req;
495 *((uint64_t *) req->data) = store->size;
496 req->serviced = req->datalen = sizeof(store->size);
497 io->retval = req->datalen;
502 static void dispatch(struct store *store, struct io *io)
504 switch (io->req->op) {
507 handle_read_write(store, io); break;
509 handle_info(store, io); break;
512 handle_unknown(store, io);
516 static void handle_resubmit(struct store *store, struct io *io)
521 static void handle_accepted(struct store *store, struct io *io)
523 struct xseg_request *req = io->req;
525 req->state = XS_ACCEPTED;
527 //log_io("accepted", io);
528 gettimeofday(&io->start, NULL);
532 static int sosd_loop(struct store *store)
534 struct xseg *xseg = store->xseg;
535 uint32_t portno = store->portno;
536 struct io *io, *resubmit_io;
537 struct xseg_request *accepted;
541 xseg_prepare_wait(xseg, portno);
542 io = alloc_io(store);
544 accepted = xseg_accept(xseg, portno);
546 xseg_cancel_wait(xseg, portno);
548 handle_accepted(store, io);
552 resubmit_io = get_resubmitted_io(store);
554 xseg_cancel_wait(xseg, portno);
555 handle_resubmit(store, resubmit_io);
557 if (!accepted && !resubmit_io)
558 xseg_wait_signal(xseg, 10000);
564 static struct xseg *join(char *spec)
566 struct xseg_config config;
569 (void)xseg_parse_spec(spec, &config);
570 xseg = xseg_join(config.type, config.name, "posix", NULL);
574 (void)xseg_create(&config);
575 return xseg_join(config.type, config.name, "posix", NULL);
578 static int sosd(char *path, unsigned long size, uint32_t nr_ops,
579 char *spec, long portno)
583 store = malloc(sizeof(struct store));
589 store->sos = sos_init(sos_cb);
591 fprintf(stderr, "SOS init failed\n");
601 store->pid = syscall(SYS_gettid);
603 // just a temp solution.
604 // Make all images 20GB. Maybe use an image header object for a more
605 // permantent solution.
606 store->size=20*1024*1024;
608 if (sigemptyset(&store->signal_set))
609 perror("sigemptyset");
611 if (sigaddset(&store->signal_set, SIGIO))
615 store->nr_ops = nr_ops;
616 store->free_bufs = calloc(nr_ops, sizeof(xqindex));
617 if (!store->free_bufs)
620 store->resubmit_bufs = calloc(nr_ops, sizeof(xqindex));
621 if (!store->resubmit_bufs)
624 store->ios = calloc(nr_ops, sizeof(struct io));
631 xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
632 xq_init_empty(&store->resubmit_ops, nr_ops, store->resubmit_bufs);
635 if (xseg_initialize()) {
636 printf("cannot initialize library\n");
639 store->xseg = join(spec);
643 store->xport = xseg_bind_port(store->xseg, portno);
645 printf("cannot bind to port %ld\n", portno);
649 store->portno = xseg_portno(store->xseg, store->xport);
650 printf("sosd on port %u/%u\n",
651 store->portno, store->xseg->config.nr_ports);
653 return sosd_loop(store);
656 int main(int argc, char **argv)
658 char *path, *spec = "";
663 unsigned int debug_level = 0;
673 for (i = 2; i < argc; i++) {
674 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
680 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
681 portno = strtoul(argv[i+1], NULL, 10);
686 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
687 nr_ops = strtoul(argv[i+1], NULL, 10);
691 if (!strcmp(argv[i], "-v") ) {
697 sos_set_debug_level(debug_level);
698 verbose = debug_level;
703 return sosd(path, size, nr_ops, spec, portno);