11 #include <xseg/xseg.h>
12 #include <util_libs/user/sos/sos.h>
16 #include <sys/syscall.h>
18 /* maybe add this to store struct */
19 #define objsize (4*1024*1024)
20 #define MAX_VOL_NAME 242
22 static int usage(void)
24 printf("Usage: ./sosd <path_to_disk_image> [options]\n"
25 "Options: [-p portno]\n"
26 " [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
27 " [-n nr_parallel_ops]\n");
32 struct xseg_request *req;
34 struct sos_request sos_req;
35 char objname[MAX_VOL_NAME +1 + 12 + 1];
41 struct xseg_port *xport;
48 struct xq resubmit_ops;
56 static unsigned int verbose;
58 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
63 static void signal_self(struct store *store)
65 union sigval sigval = {0};
66 pid_t me = store->pid;
67 if (sigqueue(me, SIGIO, sigval) < 0)
71 static int wait_signal(struct store *store)
76 uint32_t usec_timeout = 5000;
78 ts.tv_sec = usec_timeout / 1000000;
79 ts.tv_nsec = 1000 * (usec_timeout - ts.tv_sec * 1000000);
81 r = sigtimedwait(&store->signal_set, &siginfo, &ts);
85 return siginfo.si_signo;
89 static struct io *alloc_io(struct store *store)
91 xqindex idx = xq_pop_head(&store->free_ops, 1);
94 return store->ios + idx;
97 static inline void free_io(struct store *store, struct io *io)
99 xqindex idx = io - store->ios;
101 xq_append_head(&store->free_ops, idx, 1);
102 /* not the right place. sosd_loop couldn't sleep because of that
103 * needed for flush support. maybe this should go to complete function
109 static void resubmit_io(struct store *store, struct io *io)
111 xqindex idx = io - store->ios;
112 xq_append_tail(&store->resubmit_ops, idx, 1);
115 static struct io* get_resubmitted_io(struct store *store)
117 xqindex idx = xq_pop_head(&store->resubmit_ops, 1);
120 return store->ios + idx;
123 static void log_io(char *msg, struct io *io)
125 char target[64], data[64];
126 /* null terminate name in case of req->target is less than 63 characters,
127 * and next character after name (aka first byte of next buffer) is not
130 unsigned int end = (io->req->targetlen> 63) ? 63 : io->req->targetlen;
132 strncpy(target, io->req->target, end);
134 strncpy(data, io->req->data, 63);
136 printf("%s: sos req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
137 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
139 (unsigned int)io->sos_req.id,
140 (unsigned int)io->req->op,
141 (unsigned long long)io->sos_req.offset,
142 (unsigned long)io->sos_req.size,
143 (unsigned long)io->req->serviced,
144 (unsigned long)io->retval,
145 (unsigned int)io->req->state,
146 (unsigned int)io->req->targetlen, target,
147 (unsigned long long)io->req->datalen, data);
151 static void complete(struct store *store, struct io *io)
153 struct xseg_request *req = io->req;
157 gettimeofday(&end, NULL);
158 timersub(&end, &io->start, &end);
159 us = end.tv_sec*1000000 +end.tv_usec;
160 printf("sosd: Request %lu completed after %lu us\n", io->sos_req.id, us);
163 req->state |= XS_SERVED;
164 log_io("complete", io);
165 xseg_respond(store->xseg, req->portno, req);
166 xseg_signal(store->xseg, req->portno);
170 static void fail(struct store *store, struct io *io)
172 struct xseg_request *req = io->req;
173 req->state |= XS_FAILED;
175 xseg_respond(store->xseg, req->portno, req);
176 xseg_signal(store->xseg, req->portno);
180 static void pending(struct store *store, struct io *io)
182 io->req->state = XS_PENDING;
185 static void handle_unknown(struct store *store, struct io *io)
187 struct xseg_request *req = io->req;
188 snprintf(req->data, req->datalen, "unknown request op");
192 static int32_t get_sos_op(uint32_t xseg_op)
204 static uint32_t get_sos_flags(uint32_t xseg_flags)
207 if (xseg_flags & XF_FLUSH) {
210 if (xseg_flags & XF_FUA) {
216 static int calculate_sosreq(struct xseg_request *xseg_req, struct sos_request *sos_req)
222 /* get object name from offset in volume */
223 buf = sos_req->target;
224 suffix = (unsigned int) ((xseg_req->offset+xseg_req->serviced) / (uint64_t)objsize) ;
225 // printf("suffix: %u\n", suffix);
226 if (xseg_req->targetlen> MAX_VOL_NAME){
227 printf("xseg_req targetlen > MAX_VOL_NAME\n");
230 strncpy(buf, xseg_req->target, xseg_req->targetlen);
231 buf[xseg_req->targetlen] = '_';
232 r = snprintf(buf+xseg_req->targetlen+1, 13, "%012u", suffix);
236 //sos_req->target = buf;
237 sos_req->targetlen = xseg_req->targetlen+1+12;
239 /* offset should be set to offset in object */
240 sos_req->offset = (xseg_req->offset + xseg_req->serviced) % objsize;
241 /* sos_req offset + sos_req size < objsize always
242 * request data up to the end of object.
244 sos_req->size = (xseg_req->datalen - xseg_req->serviced) ; /* should this be xseg_req->size ? */
245 if (sos_req->size > objsize - sos_req->offset)
246 sos_req->size = objsize - sos_req->offset;
247 /* this should have been checked before this call */
248 if (xseg_req->serviced < xseg_req->datalen)
249 sos_req->data = xseg_req->data + xseg_req->serviced;
252 // printf("name: %s, size: %lu, offset: %lu, data:%s\n", sos_req->target,
253 // sos_req->size, sos_req->offset, sos_req->data);
257 static void prepare_sosreq(struct store *store, struct io *io)
259 struct xseg_request *xseg_req = io->req;
260 struct sos_request *sos_req = &io->sos_req;
261 sos_req->flags = get_sos_flags(xseg_req->flags);
262 sos_req->state = S_PENDING;
264 sos_req->op = get_sos_op(xseg_req->op);
265 sos_req->priv = store;
266 sos_req->target = io->objname;
269 static inline void prepare_io(struct store *store, struct io *io)
271 prepare_sosreq(store, io);
272 /* Assign io id to sos_req id. This can be done as an initialization of
273 * ios, to avoid reseting it every time */
274 io->sos_req.id = io - store->ios;
278 static void handle_resubmit(struct store *store, struct io *io);
280 static void complete_rw(struct store *store, struct io *io)
283 struct xseg_request *req = io->req;
284 struct sos_request *sos_req = &io->sos_req;
285 if (req->state == XS_ACCEPTED) {
286 /* should not happen */
291 req->serviced += io->retval;
292 else if (io->retval == 0) {
293 /* reached end of object. zero out rest of data
294 * requested from this object
296 memset(sos_req->data, 0, sos_req->size);
297 req->serviced += sos_req->size;
299 else if (io->retval == -2) {
300 /* object not found. return zeros instead */
301 memset(sos_req->data, 0, sos_req->size);
302 req->serviced += sos_req->size;
309 /* request completed ? */
310 if (req->serviced >= req->datalen) {
316 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
318 /* should not happen */
326 log_io("resubmitting", io);
327 resubmit_io(store, io);
331 snprintf(req->data, req->datalen,
332 "wtf, corrupt op %u?\n", req->op);
338 static void handle_read_write(struct store *store, struct io *io)
341 struct xseg_request *req = io->req;
342 struct sos_request *sos_req = &io->sos_req;
343 struct io *resubmit_io;
346 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
348 prepare_io(store, io);
350 if (req->flags & XF_FLUSH) {
352 /* note that with FLUSH/size == 0
353 * there will probably be a (uint64_t)-1 offset */
355 /* size must be zero */
357 /* all these should be irrelevant on a flush request */
359 sos_req->targetlen= 0;
363 * make sure all pending requests are completed and then
364 * perform flush request to flush them to disk.
366 while (xq_size(&store->free_ops) != store->nr_ops){
368 /* handle any possible resubmissions */
369 resubmit_io = get_resubmitted_io(store);
371 handle_resubmit(store, resubmit_io);
372 resubmit_io = get_resubmitted_io(store);
375 r = sos_submit(store->sos, sos_req);
392 r = calculate_sosreq(req, sos_req);
401 //log_io("submit", io);
403 r = sos_submit(store->sos, sos_req);
406 snprintf(req->data, req->datalen,
407 "wtf, corrupt op %u?\n", req->op);
413 strerror_r(errno, req->data, req->datalen);
419 static void handle_returned(struct store *store, struct io *io)
421 io->retval = io->sos_req.retval;
422 switch (io->req->op){
425 complete_rw(store, io);
428 if (io->sos_req.state & S_FAILED)
435 /* this is safe for now, as long as callback is only called once.
436 * if callback gets called, then sos_request has been completed and no race
439 static int sos_cb(struct sos_request *sos_req, unsigned long event)
441 struct store *store = (struct store *) sos_req->priv;
442 struct io *io = (struct io*) store->ios + sos_req->id;
444 if (event == S_NOTIFY_FAIL){
445 sos_req->state = S_FAILED;
447 else if (event == S_NOTIFY_ACK) {
448 sos_req->state = S_ACKED;
450 else if (event == S_NOTIFY_COMMIT){
451 sos_req->state = S_COMMITED;
453 handle_returned(store, io);
457 static void handle_info(struct store *store, struct io *io)
459 struct xseg_request *req = io->req;
461 *((uint64_t *) req->data) = store->size;
462 req->serviced = req->datalen = sizeof(store->size);
463 io->retval = req->datalen;
468 static void dispatch(struct store *store, struct io *io)
470 switch (io->req->op) {
473 handle_read_write(store, io); break;
475 handle_info(store, io); break;
478 handle_unknown(store, io);
482 static void handle_resubmit(struct store *store, struct io *io)
487 static void handle_accepted(struct store *store, struct io *io)
489 struct xseg_request *req = io->req;
491 req->state = XS_ACCEPTED;
493 //log_io("accepted", io);
494 gettimeofday(&io->start, NULL);
498 static int sosd_loop(struct store *store)
500 struct xseg *xseg = store->xseg;
501 uint32_t portno = store->portno;
502 struct io *io, *resubmit_io;
503 struct xseg_request *accepted;
507 xseg_prepare_wait(xseg, portno);
508 io = alloc_io(store);
510 accepted = xseg_accept(xseg, portno);
512 xseg_cancel_wait(xseg, portno);
514 handle_accepted(store, io);
518 resubmit_io = get_resubmitted_io(store);
520 xseg_cancel_wait(xseg, portno);
521 handle_resubmit(store, resubmit_io);
523 if (!accepted && !resubmit_io)
524 xseg_wait_signal(xseg, 10000);
530 static struct xseg *join(char *spec)
532 struct xseg_config config;
535 (void)xseg_parse_spec(spec, &config);
536 xseg = xseg_join(config.type, config.name, "posix", NULL);
540 (void)xseg_create(&config);
541 return xseg_join(config.type, config.name, "posix", NULL);
544 static int sosd(char *path, unsigned long size, uint32_t nr_ops,
545 char *spec, long portno)
549 store = malloc(sizeof(struct store));
555 store->sos = sos_init(sos_cb);
557 fprintf(stderr, "SOS init failed\n");
567 store->pid = syscall(SYS_gettid);
569 // just a temp solution.
570 // Make all images 20GB. Maybe use an image header object for a more
571 // permantent solution.
572 store->size=20*1024*1024;
574 if (sigemptyset(&store->signal_set))
575 perror("sigemptyset");
577 if (sigaddset(&store->signal_set, SIGIO))
581 store->nr_ops = nr_ops;
582 store->free_bufs = calloc(nr_ops, sizeof(xqindex));
583 if (!store->free_bufs)
586 store->resubmit_bufs = calloc(nr_ops, sizeof(xqindex));
587 if (!store->resubmit_bufs)
590 store->ios = calloc(nr_ops, sizeof(struct io));
597 xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
598 xq_init_empty(&store->resubmit_ops, nr_ops, store->resubmit_bufs);
601 if (xseg_initialize()) {
602 printf("cannot initialize library\n");
605 store->xseg = join(spec);
609 store->xport = xseg_bind_port(store->xseg, portno);
611 printf("cannot bind to port %ld\n", portno);
615 store->portno = xseg_portno(store->xseg, store->xport);
616 printf("sosd on port %u/%u\n",
617 store->portno, store->xseg->config.nr_ports);
619 return sosd_loop(store);
622 int main(int argc, char **argv)
624 char *path, *spec = "";
629 unsigned int debug_level = 0;
639 for (i = 2; i < argc; i++) {
640 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
646 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
647 portno = strtoul(argv[i+1], NULL, 10);
652 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
653 nr_ops = strtoul(argv[i+1], NULL, 10);
657 if (!strcmp(argv[i], "-v") ) {
663 sos_set_debug_level(debug_level);
664 verbose = debug_level;
669 return sosd(path, size, nr_ops, spec, portno);