2 * The Pithos File Blocker Peer (pfiled)
18 #include <sys/sendfile.h>
20 #include <xseg/xseg.h>
21 #include <xseg/protocol.h>
23 #include "common.h" /* FIXME: */
25 #define MAX_PATH_SIZE 1024
26 #define MAX_FILENAME_SIZE 255
28 /* default concurrency level (number of threads) */
29 #define DEFAULT_NR_OPS 16
31 /* Pithos hash for the zero block
32 * FIXME: Should it be hardcoded?
35 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
38 * Globals, holding command-line arguments
40 long cmdline_portno = -1;
41 char *cmdline_xseg_spec = NULL;
42 char *cmdline_path = NULL;
43 char *cmdline_vpath = NULL;
44 long cmdline_nr_ops = DEFAULT_NR_OPS;
45 long cmdline_verbose = 0;
47 static int usage(char *argv0)
50 "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
52 "\tPATH: path to pithos data blocks\n"
53 "\tVPATH: path to modified volume blocks\n"
54 "\tPORT: xseg port to listen for requests on\n"
55 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
56 "request_size:extra_size:page_shift'\n"
57 "\tNR_OPS: number of outstanding xseg requests\n"
58 "\t-v: verbose mode\n",
64 /* fdcache_node flags */
65 #define READY (1 << 1)
67 /* fdcache node info */
70 volatile unsigned int ref;
71 volatile unsigned long time;
72 volatile unsigned int flags;
74 char target[MAX_FILENAME_SIZE + 1];
80 struct xseg_port *xport;
87 struct sigevent sigevent;
90 uint64_t handled_reqs;
92 struct fdcache_node *fdcache;
94 pthread_mutex_t cache_lock;
95 char path[MAX_PATH_SIZE + 1];
96 char vpath[MAX_PATH_SIZE + 1];
100 * pfiled specific structure
101 * containing information on a pending I/O operation
104 struct pfiled *pfiled;
105 struct xseg_request *req;
110 pthread_mutex_t lock;
114 static unsigned long sigaction_count;
116 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
121 static void log_io(char *msg, struct io *io)
123 char target[65], data[65];
124 /* null terminate name in case of req->target is less than 63 characters,
125 * and next character after name (aka first byte of next buffer) is not
128 unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
129 unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
130 char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
131 char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
132 strncpy(target, req_target, end);
134 strncpy(data, req_data, 64);
138 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
139 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
141 (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
142 (unsigned int)io->req->op,
143 (unsigned long long)io->req->offset,
144 (unsigned long)io->req->size,
145 (unsigned long)io->retval,
146 (unsigned int)io->req->state,
147 (unsigned int)io->req->targetlen, target,
148 (unsigned long long)io->req->datalen, data);
151 static struct io *alloc_io(struct pfiled *pfiled)
153 xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
156 return pfiled->ios + idx;
159 static inline void free_io(struct pfiled *pfiled, struct io *io)
161 xqindex idx = io - pfiled->ios;
163 xq_append_head(&pfiled->free_ops, idx, 1);
166 static void complete(struct pfiled *pfiled, struct io *io)
168 struct xseg_request *req = io->req;
169 req->state |= XS_SERVED;
171 log_io("complete", io);
172 xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
173 xseg_signal(pfiled->xseg, p);
174 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
177 static void fail(struct pfiled *pfiled, struct io *io)
179 struct xseg_request *req = io->req;
180 req->state |= XS_FAILED;
183 xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
184 xseg_signal(pfiled->xseg, p);
185 if (io->fdcacheidx >= 0) {
186 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
190 static void handle_unknown(struct pfiled *pfiled, struct io *io)
192 struct xseg_request *req = io->req;
193 char *data = xseg_get_data(pfiled->xseg, req);
194 snprintf(data, req->datalen, "unknown request op");
198 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
202 uint32_t pathlen = strlen(path);
205 strncpy(buf, path, pathlen);
207 start = strchr(target, ':');
213 for (i = 0; i < 9; i+= 3) {
214 buf[pathlen + i] = start[i - (i/3)];
215 buf[pathlen + i +1] = start[i + 1 - (i/3)];
216 buf[pathlen + i + 2] = '/';
218 buf[pathlen + i + 3] = '\0';
219 if (stat(buf, &st) < 0)
220 if (mkdir(buf, 0600) < 0) {
227 strncpy(&buf[pathlen + 9], target, targetlen);
228 buf[pathlen + 9 + targetlen] = '\0';
233 static int dir_open(struct pfiled *pfiled, struct io *io,
234 char *target, uint32_t targetlen, int mode)
237 struct fdcache_node *ce = NULL;
239 char tmp[pfiled->path_len + targetlen + 10];
242 if (targetlen> MAX_FILENAME_SIZE)
247 pthread_mutex_lock(&pfiled->cache_lock);
251 for (i = 0; i < pfiled->maxfds; i++) {
252 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time
253 && (pfiled->fdcache[i].flags & READY)) {
254 min = pfiled->fdcache[i].time;
259 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
260 if (pfiled->fdcache[i].target[targetlen] == 0) {
261 ce = &pfiled->fdcache[i];
262 /* if any other io thread is currently opening
263 * the file, block until it succeeds or fails
265 if (!(ce->flags & READY)) {
266 pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
267 /* when ready, restart lookup */
270 /* if successfully opened */
272 fd = pfiled->fdcache[i].fd;
276 /* else open failed for the other io thread, so
277 * it should fail for everyone waiting on this
289 /* all cache entries are currently being used */
290 pthread_mutex_unlock(&pfiled->cache_lock);
293 if (pfiled->fdcache[lru].ref){
295 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
298 /* make room for new file */
299 ce = &pfiled->fdcache[lru];
300 /* set name here and state to not ready, for any other requests on the
301 * same target that may follow
303 strncpy(ce->target, target, targetlen);
304 ce->target[targetlen] = 0;
306 pthread_mutex_unlock(&pfiled->cache_lock);
309 if (close(ce->fd) < 0){
314 /* try opening it from pithos blocker dir */
315 if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
320 fd = open(tmp, O_RDWR);
322 /* try opening it from the tmp dir */
323 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
326 fd = open(tmp, O_RDWR);
328 if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
333 fd = open(tmp, O_RDWR | O_CREAT, 0600);
339 /* insert in cache a negative fd to indicate opening error to
340 * any other ios waiting for the file to open
343 /* insert in cache */
345 pthread_mutex_lock(&pfiled->cache_lock);
349 pthread_cond_broadcast(&ce->cond);
351 io->fdcacheidx = lru;
359 pfiled->handled_reqs++;
360 ce->time = pfiled->handled_reqs;
361 __sync_fetch_and_add(&ce->ref, 1);
362 pthread_mutex_unlock(&pfiled->cache_lock);
367 pthread_mutex_unlock(&pfiled->cache_lock);
371 static void handle_read_write(struct pfiled *pfiled, struct io *io)
374 struct xseg_request *req = io->req;
375 char *target = xseg_get_target(pfiled->xseg, req);
376 char *data = xseg_get_data(pfiled->xseg, req);
378 fd = dir_open(pfiled, io, target, req->targetlen, 0);
386 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
388 if (req->flags & (XF_FLUSH | XF_FUA)) {
389 /* No FLUSH/FUA support yet (O_SYNC ?).
390 * note that with FLUSH/size == 0
391 * there will probably be a (uint64_t)-1 offset */
392 complete(pfiled, io);
395 complete(pfiled, io);
402 while (req->serviced < req->datalen) {
403 r = pread(fd, data + req->serviced,
404 req->datalen - req->serviced,
405 req->offset + req->serviced);
407 req->datalen = req->serviced;
411 /* reached end of file. zero out the rest data buffer */
412 memset(data + req->serviced, 0, req->datalen - req->serviced);
413 req->serviced = req->datalen;
421 while (req->serviced < req->datalen) {
422 r = pwrite(fd, data + req->serviced,
423 req->datalen - req->serviced,
424 req->offset + req->serviced);
426 req->datalen = req->serviced;
429 /* reached end of file. zero out the rest data buffer */
430 memset(data + req->serviced, 0, req->datalen - req->serviced);
431 req->serviced = req->datalen;
440 /* if fsync fails, then no bytes serviced correctly */
445 snprintf(data, req->datalen,
446 "wtf, corrupt op %u?\n", req->op);
451 if (req->serviced > 0 ) {
452 complete(pfiled, io);
455 strerror_r(errno, data, req->datalen);
461 static void handle_info(struct pfiled *pfiled, struct io *io)
463 struct xseg_request *req = io->req;
467 char *target = xseg_get_target(pfiled->xseg, req);
468 char *data = xseg_get_data(pfiled->xseg, req);
469 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
471 fd = dir_open(pfiled, io, target, req->targetlen, 0);
477 r = fstat(fd, &stat);
484 size = (uint64_t)stat.st_size;
487 complete(pfiled, io);
490 static void handle_copy(struct pfiled *pfiled, struct io *io)
492 struct xseg_request *req = io->req;
493 char *target = xseg_get_target(pfiled->xseg, req);
494 char *data = xseg_get_data(pfiled->xseg, req);
495 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
497 char *buf = malloc(256);
500 dst = dir_open(pfiled, io, target, req->targetlen, 1);
502 fprintf(stderr, "fail in dst\n");
507 if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0) {
512 src = open(buf, O_RDWR);
514 fprintf(stderr, "fail in src\n");
520 n = sendfile(dst, src, 0, st.st_size);
521 if (n != st.st_size) {
522 fprintf(stderr, "fail in copy\n");
528 fprintf(stderr, "fail in cp\n");
533 complete(pfiled, io);
539 static void handle_delete(struct pfiled *pfiled, struct io *io)
541 struct xseg_request *req = io->req;
542 char *buf = malloc(255);
544 char *target = xseg_get_target(pfiled->xseg, req);
546 fd = dir_open(pfiled, io, target, req->targetlen, 0);
548 fprintf(stderr, "fail in dir_open\n");
553 /* 'invalidate' cache entry */
554 if (io->fdcacheidx >= 0) {
555 pfiled->fdcache[io->fdcacheidx].fd = -1;
560 if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
566 complete(pfiled, io);
571 static void dispatch(struct pfiled *pfiled, struct io *io)
573 if (cmdline_verbose) {
574 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
575 (void *)io, (void *)io->req, io->req->op);
578 switch (io->req->op) {
581 handle_read_write(pfiled, io); break;
583 handle_info(pfiled, io); break;
585 handle_copy(pfiled, io); break;
587 handle_delete(pfiled, io); break;
591 handle_unknown(pfiled, io);
595 static void handle_accepted(struct pfiled *pfiled, struct io *io)
597 struct xseg_request *req = io->req;
599 io->state = XS_ACCEPTED;
601 dispatch(pfiled, io);
604 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
606 struct io *io = alloc_io(pfiled);
609 pthread_mutex_lock(&io->lock);
610 pthread_cond_signal(&io->cond);
611 pthread_mutex_unlock(&io->lock);
616 void *io_loop(void *arg)
618 struct io *io = (struct io *) arg;
619 struct pfiled *pfiled = io->pfiled;
620 struct xseg *xseg = pfiled->xseg;
621 uint32_t portno = pfiled->portno;
622 struct xseg_request *accepted;
626 accepted = xseg_accept(xseg, portno);
629 wake_up_next_iothread(pfiled);
630 handle_accepted(pfiled, io);
633 pthread_mutex_lock(&io->lock);
635 pthread_cond_wait(&io->cond, &io->lock);
636 pthread_mutex_unlock(&io->lock);
643 static struct xseg *join_or_create(char *spec)
645 struct xseg_config config;
648 (void)xseg_parse_spec(spec, &config);
649 xseg = xseg_join(config.type, config.name, "posix", NULL);
653 (void)xseg_create(&config);
654 return xseg_join(config.type, config.name, "posix", NULL);
657 static int pfiled_loop(struct pfiled *pfiled)
659 struct xseg *xseg = pfiled->xseg;
660 uint32_t portno = pfiled->portno;
661 /* GCC + pthreads glitch? */
665 io = wake_up_next_iothread(pfiled);
666 xseg_prepare_wait(xseg, portno);
667 xseg_wait_signal(xseg, 1000000UL);
673 static int pfiled_init(struct pfiled *pfiled)
679 pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
680 pfiled->sigevent.sigev_signo = SIGIO;
681 sa.sa_sigaction = sigaction_handler;
682 sa.sa_flags = SA_SIGINFO;
684 if ((ret = sigemptyset(&sa.sa_mask))) {
685 perr(PE, 0, "[sigemptyset]");
689 if ((ret = sigaction(SIGIO, &sa, NULL))) {
690 perr(PE, 0, "[sigaction]");
691 /* FIXME: Since this is an init routine, if it fails the program will
692 * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
693 * anything xseg-related
698 pfiled->nr_ops = cmdline_nr_ops;
699 pfiled->maxfds = 2 * cmdline_nr_ops;
701 pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
702 if(!pfiled->fdcache) {
704 perr(PE, 0, "could not allocate memory [fdcache]");
709 pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
710 if(!pfiled->free_bufs) {
712 perr(PE, 0, "could not allocate memory [free_bufs]");
716 pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
717 if(!pfiled->iothread) {
719 perr(PE, 0, "could not allocate memory [iothreads]");
723 pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
726 perr(PE, 0, "could not allocate memory [ios]");
730 for (i = 0; i < pfiled->nr_ops; i++) {
731 pfiled->ios[i].pfiled = pfiled;
732 pthread_cond_init(&pfiled->ios[i].cond, NULL);
733 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
736 xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
739 pfiled->handled_reqs = 0;
741 strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
742 pfiled->path[MAX_PATH_SIZE] = 0;
744 strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
745 pfiled->vpath[MAX_PATH_SIZE] = 0;
747 pfiled->path_len = strlen(pfiled->path);
748 if (pfiled->path[pfiled->path_len -1] != '/'){
749 pfiled->path[pfiled->path_len] = '/';
750 pfiled->path[++pfiled->path_len]= 0;
753 pfiled->vpath_len = strlen(pfiled->vpath);
754 if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
755 pfiled->vpath[pfiled->vpath_len] = '/';
756 pfiled->vpath[++pfiled->vpath_len]= 0;
759 if (xseg_initialize()) {
761 perr(PE, 0, "could not initialize xseg library");
765 pfiled->xseg = join_or_create(cmdline_xseg_spec);
768 perr(PE, 0, "could not join xseg with spec '%s'\n",
770 goto out_with_xseginit;
773 pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno);
774 if (!pfiled->xport) {
776 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
777 goto out_with_xsegjoin;
780 pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
781 perr(PI, 0, "filed on port %u/%u\n",
782 pfiled->portno, pfiled->xseg->config.nr_ports);
784 if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
785 printf("cannot int local signals\n");
789 for (i = 0; i < pfiled->nr_ops; i++) {
790 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
791 pfiled->fdcache[i].flags = READY;
793 for (i = 0; i < pfiled->nr_ops; i++) {
795 * TODO: error check + cond variable to stop io from starting
796 * unless all threads are created successfully
798 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
800 pthread_mutex_init(&pfiled->cache_lock, NULL);
805 xseg_leave(pfiled->xseg);
812 static int safe_atoi(char *s)
817 l = strtol(s, &endp, 10);
818 if (s != endp && *endp == '\0')
824 static void parse_cmdline(int argc, char **argv)
826 char *argv0 = argv[0];
832 c = getopt(argc, argv, "hp:n:g:v");
838 perr(PFE, 0, "Unknown option: -%c", optopt);
841 perr(PFE, 0, "Option -%c requires an argument",
849 cmdline_portno = safe_atoi(optarg);
852 cmdline_nr_ops = safe_atoi(optarg);
855 /* FIXME: Max length of spec? strdup, eww */
856 cmdline_xseg_spec = strdup(optarg);
857 if (!cmdline_xseg_spec)
858 perr(PFE, 0, "out of memory");
869 /* Sanity check for all arguments */
870 if (cmdline_portno < 0) {
872 perr(PFE, 0, "no or invalid port specified");
874 if (cmdline_nr_ops < 1) {
876 perr(PFE, 0, "specified outstanding request count is invalid");
878 if (!cmdline_xseg_spec) {
880 perr(PFE, 0, "xseg specification is mandatory");
885 perr(PFE, 0, "path and vpath specification is mandatory");
888 cmdline_path = strdup(argv[0]);
890 perr(PFE, 0, "out of memory");
892 cmdline_vpath = strdup(argv[1]);
894 perr(PFE, 0, "out of memory");
897 int main(int argc, char **argv)
899 struct pfiled pfiled;
902 parse_cmdline(argc, argv);
904 perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
906 if (pfiled_init(&pfiled) < 0)
907 perr(PFE, 0, "failed to initialize pfiled");
909 return pfiled_loop(&pfiled);