2 * The Pithos File Blocker Peer (pfiled)
18 #include <sys/sendfile.h>
20 #include <xseg/xseg.h>
21 #include <xseg/protocol.h>
23 #include "common.h" /* FIXME: */
25 #define MAX_PATH_SIZE 1024
26 #define MAX_FILENAME_SIZE 255
28 /* default concurrency level (number of threads) */
29 #define DEFAULT_NR_OPS 16
31 /* Pithos hash for the zero block
32 * FIXME: Should it be hardcoded?
35 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
38 * Globals, holding command-line arguments
40 long cmdline_portno = -1;
41 char *cmdline_xseg_spec = NULL;
42 char *cmdline_path = NULL;
43 char *cmdline_vpath = NULL;
44 long cmdline_nr_ops = DEFAULT_NR_OPS;
45 long cmdline_verbose = 0;
47 static int usage(char *argv0)
50 "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
52 "\tPATH: path to pithos data blocks\n"
53 "\tVPATH: path to modified volume blocks\n"
54 "\tPORT: xseg port to listen for requests on\n"
55 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
56 "request_size:extra_size:page_shift'\n"
57 "\tNR_OPS: number of outstanding xseg requests\n"
58 "\t-v: verbose mode\n",
64 /* fdcache_node flags */
65 #define READY (1 << 1)
67 /* fdcache node info */
70 volatile unsigned int ref;
71 volatile unsigned long time;
72 volatile unsigned int flags;
74 char target[MAX_FILENAME_SIZE + 1];
80 struct xseg_port *xport;
87 struct sigevent sigevent;
90 uint64_t handled_reqs;
92 struct fdcache_node *fdcache;
94 pthread_mutex_t cache_lock;
95 char path[MAX_PATH_SIZE + 1];
96 char vpath[MAX_PATH_SIZE + 1];
100 * pfiled specific structure
101 * containing information on a pending I/O operation
104 struct pfiled *pfiled;
105 struct xseg_request *req;
110 pthread_mutex_t lock;
114 static unsigned long sigaction_count;
116 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
121 static void log_io(char *msg, struct io *io)
123 char target[65], data[65];
124 /* null terminate name in case of req->target is less than 63 characters,
125 * and next character after name (aka first byte of next buffer) is not
128 unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
129 unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
130 char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
131 char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
132 strncpy(target, req_target, end);
134 strncpy(data, req_data, 64);
138 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %u\n"
139 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
141 (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
142 (unsigned int)io->req->op,
143 (unsigned long long)io->req->offset,
144 (unsigned long)io->req->size,
145 (unsigned long)io->retval,
146 (unsigned int)io->req->state,
147 (unsigned long)io->req->serviced,
148 (unsigned int)io->req->targetlen, target,
149 (unsigned long long)io->req->datalen, data);
152 static struct io *alloc_io(struct pfiled *pfiled)
154 xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
157 return pfiled->ios + idx;
160 static inline void free_io(struct pfiled *pfiled, struct io *io)
162 xqindex idx = io - pfiled->ios;
164 xq_append_head(&pfiled->free_ops, idx, 1);
167 static void complete(struct pfiled *pfiled, struct io *io)
169 struct xseg_request *req = io->req;
170 req->state |= XS_SERVED;
172 log_io("complete", io);
173 xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
174 xseg_signal(pfiled->xseg, p);
175 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
178 static void fail(struct pfiled *pfiled, struct io *io)
180 struct xseg_request *req = io->req;
181 req->state |= XS_FAILED;
184 xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
185 xseg_signal(pfiled->xseg, p);
186 if (io->fdcacheidx >= 0) {
187 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
191 static void handle_unknown(struct pfiled *pfiled, struct io *io)
193 struct xseg_request *req = io->req;
194 char *data = xseg_get_data(pfiled->xseg, req);
195 snprintf(data, req->datalen, "unknown request op");
199 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
203 uint32_t pathlen = strlen(path);
205 strncpy(buf, path, pathlen);
207 for (i = 0; i < 9; i+= 3) {
208 buf[pathlen + i] = target[i - (i/3)];
209 buf[pathlen + i +1] = target[i + 1 - (i/3)];
210 buf[pathlen + i + 2] = '/';
212 buf[pathlen + i + 3] = '\0';
213 if (stat(buf, &st) < 0)
214 if (mkdir(buf, 0600) < 0) {
221 strncpy(&buf[pathlen + 9], target, targetlen);
222 buf[pathlen + 9 + targetlen] = '\0';
227 static int dir_open(struct pfiled *pfiled, struct io *io,
228 char *target, uint32_t targetlen, int mode)
231 struct fdcache_node *ce = NULL;
233 char tmp[pfiled->path_len + targetlen + 10];
236 if (targetlen> MAX_FILENAME_SIZE)
241 pthread_mutex_lock(&pfiled->cache_lock);
245 for (i = 0; i < pfiled->maxfds; i++) {
246 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time
247 && (pfiled->fdcache[i].flags & READY)) {
248 min = pfiled->fdcache[i].time;
253 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
254 if (pfiled->fdcache[i].target[targetlen] == 0) {
255 ce = &pfiled->fdcache[i];
256 /* if any other io thread is currently opening
257 * the file, block until it succeeds or fails
259 if (!(ce->flags & READY)) {
260 pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
261 /* when ready, restart lookup */
264 /* if successfully opened */
266 fd = pfiled->fdcache[i].fd;
270 /* else open failed for the other io thread, so
271 * it should fail for everyone waiting on this
283 /* all cache entries are currently being used */
284 pthread_mutex_unlock(&pfiled->cache_lock);
287 if (pfiled->fdcache[lru].ref){
289 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
292 /* make room for new file */
293 ce = &pfiled->fdcache[lru];
294 /* set name here and state to not ready, for any other requests on the
295 * same target that may follow
297 strncpy(ce->target, target, targetlen);
298 ce->target[targetlen] = 0;
300 pthread_mutex_unlock(&pfiled->cache_lock);
303 if (close(ce->fd) < 0){
308 /* try opening it from pithos blocker dir */
309 if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
314 fd = open(tmp, O_RDWR);
316 /* try opening it from the tmp dir */
317 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
320 fd = open(tmp, O_RDWR);
322 if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
327 fd = open(tmp, O_RDWR | O_CREAT, 0600);
333 /* insert in cache a negative fd to indicate opening error to
334 * any other ios waiting for the file to open
337 /* insert in cache */
339 pthread_mutex_lock(&pfiled->cache_lock);
343 pthread_cond_broadcast(&ce->cond);
345 io->fdcacheidx = lru;
353 pfiled->handled_reqs++;
354 ce->time = pfiled->handled_reqs;
355 __sync_fetch_and_add(&ce->ref, 1);
356 pthread_mutex_unlock(&pfiled->cache_lock);
361 pthread_mutex_unlock(&pfiled->cache_lock);
365 static void handle_read_write(struct pfiled *pfiled, struct io *io)
368 struct xseg_request *req = io->req;
369 char *target = xseg_get_target(pfiled->xseg, req);
370 char *data = xseg_get_data(pfiled->xseg, req);
372 fd = dir_open(pfiled, io, target, req->targetlen, 0);
380 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
382 if (req->flags & (XF_FLUSH | XF_FUA)) {
383 /* No FLUSH/FUA support yet (O_SYNC ?).
384 * note that with FLUSH/size == 0
385 * there will probably be a (uint64_t)-1 offset */
386 complete(pfiled, io);
389 complete(pfiled, io);
396 while (req->serviced < req->datalen) {
397 r = pread(fd, data + req->serviced,
398 req->datalen - req->serviced,
399 req->offset + req->serviced);
401 req->datalen = req->serviced;
405 /* reached end of file. zero out the rest data buffer */
406 memset(data + req->serviced, 0, req->datalen - req->serviced);
407 req->serviced = req->datalen;
415 while (req->serviced < req->datalen) {
416 r = pwrite(fd, data + req->serviced,
417 req->datalen - req->serviced,
418 req->offset + req->serviced);
420 req->datalen = req->serviced;
423 fprintf(stderr, "write returned 0\n");
424 memset(data + req->serviced, 0, req->datalen - req->serviced);
425 req->serviced = req->datalen;
434 /* if fsync fails, then no bytes serviced correctly */
439 snprintf(data, req->datalen,
440 "wtf, corrupt op %u?\n", req->op);
445 if (req->serviced > 0 ) {
446 complete(pfiled, io);
449 strerror_r(errno, data, req->datalen);
455 static void handle_info(struct pfiled *pfiled, struct io *io)
457 struct xseg_request *req = io->req;
461 char *target = xseg_get_target(pfiled->xseg, req);
462 char *data = xseg_get_data(pfiled->xseg, req);
463 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
465 fd = dir_open(pfiled, io, target, req->targetlen, 0);
471 r = fstat(fd, &stat);
478 size = (uint64_t)stat.st_size;
481 complete(pfiled, io);
484 static void handle_copy(struct pfiled *pfiled, struct io *io)
486 struct xseg_request *req = io->req;
487 char *target = xseg_get_target(pfiled->xseg, req);
488 char *data = xseg_get_data(pfiled->xseg, req);
489 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
491 char *buf = malloc(256);
494 dst = dir_open(pfiled, io, target, req->targetlen, 1);
496 fprintf(stderr, "fail in dst\n");
501 if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0) {
506 src = open(buf, O_RDWR);
508 XSEGLOG("fail in src %s\n", buf);
515 n = sendfile(dst, src, 0, st.st_size);
516 if (n != st.st_size) {
517 fprintf(stderr, "fail in copy\n");
523 fprintf(stderr, "fail in cp\n");
528 complete(pfiled, io);
534 static void handle_delete(struct pfiled *pfiled, struct io *io)
536 struct xseg_request *req = io->req;
537 char *buf = malloc(255);
539 char *target = xseg_get_target(pfiled->xseg, req);
541 fd = dir_open(pfiled, io, target, req->targetlen, 0);
543 fprintf(stderr, "fail in dir_open\n");
548 /* 'invalidate' cache entry */
549 if (io->fdcacheidx >= 0) {
550 pfiled->fdcache[io->fdcacheidx].fd = -1;
555 if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
561 complete(pfiled, io);
566 static void dispatch(struct pfiled *pfiled, struct io *io)
568 if (cmdline_verbose) {
569 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
570 (void *)io, (void *)io->req, io->req->op);
573 switch (io->req->op) {
576 handle_read_write(pfiled, io); break;
578 handle_info(pfiled, io); break;
580 handle_copy(pfiled, io); break;
582 handle_delete(pfiled, io); break;
586 handle_unknown(pfiled, io);
590 static void handle_accepted(struct pfiled *pfiled, struct io *io)
592 struct xseg_request *req = io->req;
594 io->state = XS_ACCEPTED;
596 dispatch(pfiled, io);
599 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
601 struct io *io = alloc_io(pfiled);
604 pthread_mutex_lock(&io->lock);
605 pthread_cond_signal(&io->cond);
606 pthread_mutex_unlock(&io->lock);
611 void *io_loop(void *arg)
613 struct io *io = (struct io *) arg;
614 struct pfiled *pfiled = io->pfiled;
615 struct xseg *xseg = pfiled->xseg;
616 uint32_t portno = pfiled->portno;
617 struct xseg_request *accepted;
621 accepted = xseg_accept(xseg, portno);
624 wake_up_next_iothread(pfiled);
625 handle_accepted(pfiled, io);
628 pthread_mutex_lock(&io->lock);
630 pthread_cond_wait(&io->cond, &io->lock);
631 pthread_mutex_unlock(&io->lock);
638 static struct xseg *join_or_create(char *spec)
640 struct xseg_config config;
643 (void)xseg_parse_spec(spec, &config);
644 xseg = xseg_join(config.type, config.name, "posix", NULL);
648 (void)xseg_create(&config);
649 return xseg_join(config.type, config.name, "posix", NULL);
652 static int pfiled_loop(struct pfiled *pfiled)
654 struct xseg *xseg = pfiled->xseg;
655 uint32_t portno = pfiled->portno;
656 /* GCC + pthreads glitch? */
660 io = wake_up_next_iothread(pfiled);
661 xseg_prepare_wait(xseg, portno);
662 xseg_wait_signal(xseg, 1000000UL);
668 static int pfiled_init(struct pfiled *pfiled)
674 pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
675 pfiled->sigevent.sigev_signo = SIGIO;
676 sa.sa_sigaction = sigaction_handler;
677 sa.sa_flags = SA_SIGINFO;
679 if ((ret = sigemptyset(&sa.sa_mask))) {
680 perr(PE, 0, "[sigemptyset]");
684 if ((ret = sigaction(SIGIO, &sa, NULL))) {
685 perr(PE, 0, "[sigaction]");
686 /* FIXME: Since this is an init routine, if it fails the program will
687 * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
688 * anything xseg-related
693 pfiled->nr_ops = cmdline_nr_ops;
694 pfiled->maxfds = 2 * cmdline_nr_ops;
696 pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
697 if(!pfiled->fdcache) {
699 perr(PE, 0, "could not allocate memory [fdcache]");
704 pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
705 if(!pfiled->free_bufs) {
707 perr(PE, 0, "could not allocate memory [free_bufs]");
711 pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
712 if(!pfiled->iothread) {
714 perr(PE, 0, "could not allocate memory [iothreads]");
718 pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
721 perr(PE, 0, "could not allocate memory [ios]");
725 for (i = 0; i < pfiled->nr_ops; i++) {
726 pfiled->ios[i].pfiled = pfiled;
727 pthread_cond_init(&pfiled->ios[i].cond, NULL);
728 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
731 xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
734 pfiled->handled_reqs = 0;
736 strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
737 pfiled->path[MAX_PATH_SIZE] = 0;
739 strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
740 pfiled->vpath[MAX_PATH_SIZE] = 0;
742 pfiled->path_len = strlen(pfiled->path);
743 if (pfiled->path[pfiled->path_len -1] != '/'){
744 pfiled->path[pfiled->path_len] = '/';
745 pfiled->path[++pfiled->path_len]= 0;
748 pfiled->vpath_len = strlen(pfiled->vpath);
749 if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
750 pfiled->vpath[pfiled->vpath_len] = '/';
751 pfiled->vpath[++pfiled->vpath_len]= 0;
754 if (xseg_initialize()) {
756 perr(PE, 0, "could not initialize xseg library");
760 pfiled->xseg = join_or_create(cmdline_xseg_spec);
763 perr(PE, 0, "could not join xseg with spec '%s'\n",
765 goto out_with_xseginit;
768 pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno, NULL);
769 if (!pfiled->xport) {
771 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
772 goto out_with_xsegjoin;
775 pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
776 perr(PI, 0, "filed on port %u/%u\n",
777 pfiled->portno, pfiled->xseg->config.nr_ports);
779 if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
780 printf("cannot int local signals\n");
784 for (i = 0; i < pfiled->nr_ops; i++) {
785 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
786 pfiled->fdcache[i].flags = READY;
788 for (i = 0; i < pfiled->nr_ops; i++) {
790 * TODO: error check + cond variable to stop io from starting
791 * unless all threads are created successfully
793 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
795 pthread_mutex_init(&pfiled->cache_lock, NULL);
800 xseg_leave(pfiled->xseg);
807 static int safe_atoi(char *s)
812 l = strtol(s, &endp, 10);
813 if (s != endp && *endp == '\0')
819 static void parse_cmdline(int argc, char **argv)
821 char *argv0 = argv[0];
827 c = getopt(argc, argv, "hp:n:g:v");
833 perr(PFE, 0, "Unknown option: -%c", optopt);
836 perr(PFE, 0, "Option -%c requires an argument",
844 cmdline_portno = safe_atoi(optarg);
847 cmdline_nr_ops = safe_atoi(optarg);
850 /* FIXME: Max length of spec? strdup, eww */
851 cmdline_xseg_spec = strdup(optarg);
852 if (!cmdline_xseg_spec)
853 perr(PFE, 0, "out of memory");
864 /* Sanity check for all arguments */
865 if (cmdline_portno < 0) {
867 perr(PFE, 0, "no or invalid port specified");
869 if (cmdline_nr_ops < 1) {
871 perr(PFE, 0, "specified outstanding request count is invalid");
873 if (!cmdline_xseg_spec) {
875 perr(PFE, 0, "xseg specification is mandatory");
880 perr(PFE, 0, "path and vpath specification is mandatory");
883 cmdline_path = strdup(argv[0]);
885 perr(PFE, 0, "out of memory");
887 cmdline_vpath = strdup(argv[1]);
889 perr(PFE, 0, "out of memory");
892 int main(int argc, char **argv)
894 struct pfiled pfiled;
897 parse_cmdline(argc, argv);
899 perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
901 if (pfiled_init(&pfiled) < 0)
902 perr(PFE, 0, "failed to initialize pfiled");
904 return pfiled_loop(&pfiled);