2 * The Pithos File Blocker Peer (pfiled)
19 #include <sys/sendfile.h>
21 #include <xseg/xseg.h>
22 #include <xseg/protocol.h>
24 #include "common.h" /* FIXME: */
26 #define MAX_PATH_SIZE 1024
27 #define MAX_FILENAME_SIZE 255
29 /* default concurrency level (number of threads) */
30 #define DEFAULT_NR_OPS 16
32 /* Pithos hash for the zero block
33 * FIXME: Should it be hardcoded?
36 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
39 * Globals, holding command-line arguments
41 long cmdline_portno = -1;
42 char *cmdline_xseg_spec = NULL;
43 char *cmdline_path = NULL;
44 char *cmdline_vpath = NULL;
45 char *cmdline_pidfile = NULL;
46 int cmdline_daemon = 0;
47 long cmdline_nr_ops = DEFAULT_NR_OPS;
48 long cmdline_verbose = 0;
49 volatile unsigned int terminated = 0;
51 static int usage(char *argv0)
54 "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
56 "\tPATH: path to pithos data blocks\n"
57 "\tVPATH: path to modified volume blocks\n"
58 "\tPORT: xseg port to listen for requests on\n"
59 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
60 "request_size:extra_size:page_shift'\n"
61 "\tNR_OPS: number of outstanding xseg requests\n"
62 "\t-v: verbose mode\n",
68 /* fdcache_node flags */
69 #define READY (1 << 1)
71 /* fdcache node info */
74 volatile unsigned int ref;
75 volatile unsigned long time;
76 volatile unsigned int flags;
78 char target[MAX_FILENAME_SIZE + 1];
84 struct xseg_port *xport;
91 struct sigevent sigevent;
94 uint64_t handled_reqs;
96 struct fdcache_node *fdcache;
98 pthread_mutex_t cache_lock;
99 char path[MAX_PATH_SIZE + 1];
100 char vpath[MAX_PATH_SIZE + 1];
104 * pfiled specific structure
105 * containing information on a pending I/O operation
108 struct pfiled *pfiled;
109 struct xseg_request *req;
114 pthread_mutex_t lock;
118 static inline int isTerminate()
120 /* ta doesn't need to be taken into account, because the main loops
121 * doesn't check the terminated flag if ta is not 0.
124 * return (!ta & terminated);
132 void signal_handler(int signal)
137 static int setup_signals()
141 sigemptyset(&sa.sa_mask);
143 sa.sa_handler = signal_handler;
144 r = sigaction(SIGTERM, &sa, NULL);
147 r = sigaction(SIGINT, &sa, NULL);
150 r = sigaction(SIGQUIT, &sa, NULL);
157 static unsigned long sigaction_count;
159 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
164 static void log_io(char *msg, struct io *io)
166 char target[65], data[65];
167 /* null terminate name in case of req->target is less than 63 characters,
168 * and next character after name (aka first byte of next buffer) is not
171 unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
172 unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
173 char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
174 char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
175 strncpy(target, req_target, end);
177 strncpy(data, req_data, 64);
181 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %u\n"
182 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
184 (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
185 (unsigned int)io->req->op,
186 (unsigned long long)io->req->offset,
187 (unsigned long)io->req->size,
188 (unsigned long)io->retval,
189 (unsigned int)io->req->state,
190 (unsigned long)io->req->serviced,
191 (unsigned int)io->req->targetlen, target,
192 (unsigned long long)io->req->datalen, data);
195 static struct io *alloc_io(struct pfiled *pfiled)
197 xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
200 return pfiled->ios + idx;
203 static inline void free_io(struct pfiled *pfiled, struct io *io)
205 xqindex idx = io - pfiled->ios;
207 xq_append_head(&pfiled->free_ops, idx, 1);
210 static void complete(struct pfiled *pfiled, struct io *io)
212 struct xseg_request *req = io->req;
213 req->state |= XS_SERVED;
215 log_io("complete", io);
216 xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
217 xseg_signal(pfiled->xseg, p);
218 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
221 static void fail(struct pfiled *pfiled, struct io *io)
223 struct xseg_request *req = io->req;
224 req->state |= XS_FAILED;
227 xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
228 xseg_signal(pfiled->xseg, p);
229 if (io->fdcacheidx >= 0) {
230 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
234 static void handle_unknown(struct pfiled *pfiled, struct io *io)
236 struct xseg_request *req = io->req;
237 char *data = xseg_get_data(pfiled->xseg, req);
238 snprintf(data, req->datalen, "unknown request op");
242 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
246 uint32_t pathlen = strlen(path);
248 strncpy(buf, path, pathlen);
250 for (i = 0; i < 9; i+= 3) {
251 buf[pathlen + i] = target[i - (i/3)];
252 buf[pathlen + i +1] = target[i + 1 - (i/3)];
253 buf[pathlen + i + 2] = '/';
255 buf[pathlen + i + 3] = '\0';
256 if (stat(buf, &st) < 0)
257 if (mkdir(buf, 0700) < 0) {
264 strncpy(&buf[pathlen + 9], target, targetlen);
265 buf[pathlen + 9 + targetlen] = '\0';
270 static int dir_open(struct pfiled *pfiled, struct io *io,
271 char *target, uint32_t targetlen, int mode)
274 struct fdcache_node *ce = NULL;
276 char tmp[pfiled->path_len + targetlen + 10];
279 if (targetlen> MAX_FILENAME_SIZE)
284 pthread_mutex_lock(&pfiled->cache_lock);
288 for (i = 0; i < pfiled->maxfds; i++) {
289 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time
290 && (pfiled->fdcache[i].flags & READY)) {
291 min = pfiled->fdcache[i].time;
296 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
297 if (pfiled->fdcache[i].target[targetlen] == 0) {
298 ce = &pfiled->fdcache[i];
299 /* if any other io thread is currently opening
300 * the file, block until it succeeds or fails
302 if (!(ce->flags & READY)) {
303 pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
304 /* when ready, restart lookup */
307 /* if successfully opened */
309 fd = pfiled->fdcache[i].fd;
313 /* else open failed for the other io thread, so
314 * it should fail for everyone waiting on this
326 /* all cache entries are currently being used */
327 pthread_mutex_unlock(&pfiled->cache_lock);
330 if (pfiled->fdcache[lru].ref){
332 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
335 /* make room for new file */
336 ce = &pfiled->fdcache[lru];
337 /* set name here and state to not ready, for any other requests on the
338 * same target that may follow
340 strncpy(ce->target, target, targetlen);
341 ce->target[targetlen] = 0;
343 pthread_mutex_unlock(&pfiled->cache_lock);
346 if (close(ce->fd) < 0){
351 /* try opening it from pithos blocker dir */
352 if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
357 fd = open(tmp, O_RDWR);
359 /* try opening it from the tmp dir */
360 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
363 fd = open(tmp, O_RDWR);
365 if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
370 fd = open(tmp, O_RDWR | O_CREAT, 0600);
376 /* insert in cache a negative fd to indicate opening error to
377 * any other ios waiting for the file to open
380 /* insert in cache */
382 pthread_mutex_lock(&pfiled->cache_lock);
386 pthread_cond_broadcast(&ce->cond);
388 io->fdcacheidx = lru;
396 pfiled->handled_reqs++;
397 ce->time = pfiled->handled_reqs;
398 __sync_fetch_and_add(&ce->ref, 1);
399 pthread_mutex_unlock(&pfiled->cache_lock);
404 pthread_mutex_unlock(&pfiled->cache_lock);
408 static void handle_read_write(struct pfiled *pfiled, struct io *io)
411 struct xseg_request *req = io->req;
412 char *target = xseg_get_target(pfiled->xseg, req);
413 char *data = xseg_get_data(pfiled->xseg, req);
415 fd = dir_open(pfiled, io, target, req->targetlen, 0);
423 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
425 if (req->flags & (XF_FLUSH | XF_FUA)) {
426 /* No FLUSH/FUA support yet (O_SYNC ?).
427 * note that with FLUSH/size == 0
428 * there will probably be a (uint64_t)-1 offset */
429 complete(pfiled, io);
432 complete(pfiled, io);
439 while (req->serviced < req->datalen) {
440 r = pread(fd, data + req->serviced,
441 req->datalen - req->serviced,
442 req->offset + req->serviced);
444 req->datalen = req->serviced;
448 /* reached end of file. zero out the rest data buffer */
449 memset(data + req->serviced, 0, req->datalen - req->serviced);
450 req->serviced = req->datalen;
458 while (req->serviced < req->datalen) {
459 r = pwrite(fd, data + req->serviced,
460 req->datalen - req->serviced,
461 req->offset + req->serviced);
463 req->datalen = req->serviced;
466 fprintf(stderr, "write returned 0\n");
467 memset(data + req->serviced, 0, req->datalen - req->serviced);
468 req->serviced = req->datalen;
477 /* if fsync fails, then no bytes serviced correctly */
482 snprintf(data, req->datalen,
483 "wtf, corrupt op %u?\n", req->op);
488 if (req->serviced > 0 ) {
489 complete(pfiled, io);
492 strerror_r(errno, data, req->datalen);
498 static void handle_info(struct pfiled *pfiled, struct io *io)
500 struct xseg_request *req = io->req;
504 char *target = xseg_get_target(pfiled->xseg, req);
505 char *data = xseg_get_data(pfiled->xseg, req);
506 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
508 fd = dir_open(pfiled, io, target, req->targetlen, 0);
514 r = fstat(fd, &stat);
521 size = (uint64_t)stat.st_size;
524 complete(pfiled, io);
527 static void handle_copy(struct pfiled *pfiled, struct io *io)
529 struct xseg_request *req = io->req;
530 char *target = xseg_get_target(pfiled->xseg, req);
531 char *data = xseg_get_data(pfiled->xseg, req);
532 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
534 //FIXME is 256 enough?
535 char *buf = malloc(256);
538 dst = dir_open(pfiled, io, target, req->targetlen, 1);
540 fprintf(stderr, "fail in dst\n");
545 if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0) {
550 src = open(buf, O_RDWR);
552 XSEGLOG("fail in src %s\n", buf);
559 n = sendfile(dst, src, 0, st.st_size);
560 if (n != st.st_size) {
561 fprintf(stderr, "fail in copy\n");
567 fprintf(stderr, "fail in cp\n");
572 complete(pfiled, io);
578 static void handle_delete(struct pfiled *pfiled, struct io *io)
580 struct xseg_request *req = io->req;
581 char *buf = malloc(255);
583 char *target = xseg_get_target(pfiled->xseg, req);
585 fd = dir_open(pfiled, io, target, req->targetlen, 0);
587 fprintf(stderr, "fail in dir_open\n");
592 /* 'invalidate' cache entry */
593 if (io->fdcacheidx >= 0) {
594 pfiled->fdcache[io->fdcacheidx].fd = -1;
599 if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
605 complete(pfiled, io);
610 static void handle_open(struct pfiled *pfiled, struct io *io)
612 struct xseg_request *req = io->req;
613 char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
614 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
616 char *target = xseg_get_target(pfiled->xseg, req);
618 if (!buf || !pathname) {
623 strncpy(buf, target, req->targetlen);
624 strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
626 if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
630 fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
637 complete(pfiled, io);
647 static void handle_close(struct pfiled *pfiled, struct io *io)
649 struct xseg_request *req = io->req;
650 char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
651 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
652 char *target = xseg_get_target(pfiled->xseg, req);
654 if (!buf || !pathname) {
659 strncpy(buf, target, req->targetlen);
660 strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
662 if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
668 complete(pfiled, io);
678 static void dispatch(struct pfiled *pfiled, struct io *io)
680 if (cmdline_verbose) {
681 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
682 (void *)io, (void *)io->req, io->req->op);
685 switch (io->req->op) {
688 handle_read_write(pfiled, io); break;
690 handle_info(pfiled, io); break;
692 handle_copy(pfiled, io); break;
694 handle_delete(pfiled, io); break;
696 handle_open(pfiled, io); break;
698 handle_close(pfiled, io); break;
702 handle_unknown(pfiled, io);
706 static void handle_accepted(struct pfiled *pfiled, struct io *io)
708 struct xseg_request *req = io->req;
710 io->state = XS_ACCEPTED;
712 dispatch(pfiled, io);
715 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
717 struct io *io = alloc_io(pfiled);
720 pthread_mutex_lock(&io->lock);
721 pthread_cond_signal(&io->cond);
722 pthread_mutex_unlock(&io->lock);
727 void *io_loop(void *arg)
729 struct io *io = (struct io *) arg;
730 struct pfiled *pfiled = io->pfiled;
731 struct xseg *xseg = pfiled->xseg;
732 uint32_t portno = pfiled->portno;
733 struct xseg_request *accepted;
738 accepted = xseg_accept(xseg, portno, 0);
741 wake_up_next_iothread(pfiled);
742 handle_accepted(pfiled, io);
745 pthread_mutex_lock(&io->lock);
747 pthread_cond_wait(&io->cond, &io->lock);
748 pthread_mutex_unlock(&io->lock);
755 static struct xseg *join_or_create(char *spec)
757 struct xseg_config config;
760 (void)xseg_parse_spec(spec, &config);
761 xseg = xseg_join(config.type, config.name, "posix", NULL);
765 (void)xseg_create(&config);
766 return xseg_join(config.type, config.name, "posix", NULL);
769 static int pfiled_loop(struct pfiled *pfiled)
771 struct xseg *xseg = pfiled->xseg;
772 uint32_t portno = pfiled->portno;
773 /* GCC + pthreads glitch? */
776 for (;!(isTerminate() && xq_count(&pfiled->free_ops) == pfiled->nr_ops);) {
777 io = wake_up_next_iothread(pfiled);
778 xseg_prepare_wait(xseg, portno);
779 xseg_wait_signal(xseg, 1000000UL);
785 static int pfiled_init(struct pfiled *pfiled)
791 pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
792 pfiled->sigevent.sigev_signo = SIGIO;
793 sa.sa_sigaction = sigaction_handler;
794 sa.sa_flags = SA_SIGINFO;
796 if ((ret = sigemptyset(&sa.sa_mask))) {
797 perr(PE, 0, "[sigemptyset]");
801 if ((ret = sigaction(SIGIO, &sa, NULL))) {
802 perr(PE, 0, "[sigaction]");
803 /* FIXME: Since this is an init routine, if it fails the program will
804 * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
805 * anything xseg-related
810 pfiled->nr_ops = cmdline_nr_ops;
811 pfiled->maxfds = 2 * cmdline_nr_ops;
813 pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
814 if(!pfiled->fdcache) {
816 perr(PE, 0, "could not allocate memory [fdcache]");
821 pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
822 if(!pfiled->free_bufs) {
824 perr(PE, 0, "could not allocate memory [free_bufs]");
828 pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
829 if(!pfiled->iothread) {
831 perr(PE, 0, "could not allocate memory [iothreads]");
835 pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
838 perr(PE, 0, "could not allocate memory [ios]");
842 for (i = 0; i < pfiled->nr_ops; i++) {
843 pfiled->ios[i].pfiled = pfiled;
844 pthread_cond_init(&pfiled->ios[i].cond, NULL);
845 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
848 xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
851 pfiled->handled_reqs = 0;
853 strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
854 pfiled->path[MAX_PATH_SIZE] = 0;
856 strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
857 pfiled->vpath[MAX_PATH_SIZE] = 0;
859 pfiled->path_len = strlen(pfiled->path);
860 if (pfiled->path[pfiled->path_len -1] != '/'){
861 pfiled->path[pfiled->path_len] = '/';
862 pfiled->path[++pfiled->path_len]= 0;
865 pfiled->vpath_len = strlen(pfiled->vpath);
866 if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
867 pfiled->vpath[pfiled->vpath_len] = '/';
868 pfiled->vpath[++pfiled->vpath_len]= 0;
871 if (xseg_initialize()) {
873 perr(PE, 0, "could not initialize xseg library");
877 pfiled->xseg = join_or_create(cmdline_xseg_spec);
880 perr(PE, 0, "could not join xseg with spec '%s'\n",
882 goto out_with_xseginit;
885 pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno, NULL);
886 if (!pfiled->xport) {
888 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
889 goto out_with_xsegjoin;
892 pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
893 perr(PI, 0, "filed on port %u/%u\n",
894 pfiled->portno, pfiled->xseg->config.nr_ports);
896 if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
897 perr(PE, 0, "cannot int local signals\n");
901 for (i = 0; i < pfiled->nr_ops; i++) {
902 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
903 pfiled->fdcache[i].flags = READY;
905 for (i = 0; i < pfiled->nr_ops; i++) {
907 * TODO: error check + cond variable to stop io from starting
908 * unless all threads are created successfully
910 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
912 pthread_mutex_init(&pfiled->cache_lock, NULL);
917 xseg_leave(pfiled->xseg);
924 static int safe_atoi(char *s)
929 l = strtol(s, &endp, 10);
930 if (s != endp && *endp == '\0')
936 static void parse_cmdline(int argc, char **argv)
938 char *argv0 = argv[0];
944 c = getopt(argc, argv, "dhp:n:g:vf:");
950 perr(PFE, 0, "Unknown option: -%c", optopt);
953 perr(PFE, 0, "Option -%c requires an argument",
961 cmdline_portno = safe_atoi(optarg);
964 cmdline_nr_ops = safe_atoi(optarg);
967 /* FIXME: Max length of spec? strdup, eww */
968 cmdline_xseg_spec = strdup(optarg);
969 if (!cmdline_xseg_spec)
970 perr(PFE, 0, "out of memory");
979 /* FIXME: Max length of spec? strdup, eww */
980 cmdline_pidfile = strdup(optarg);
981 if (!cmdline_pidfile)
982 perr(PFE, 0, "out of memory");
990 /* Sanity check for all arguments */
991 if (cmdline_portno < 0) {
993 perr(PFE, 0, "no or invalid port specified");
995 if (cmdline_nr_ops < 1) {
997 perr(PFE, 0, "specified outstanding request count is invalid");
999 if (!cmdline_xseg_spec) {
1001 perr(PFE, 0, "xseg specification is mandatory");
1006 perr(PFE, 0, "path and vpath specification is mandatory");
1009 cmdline_path = strdup(argv[0]);
1011 perr(PFE, 0, "out of memory");
1013 cmdline_vpath = strdup(argv[1]);
1015 perr(PFE, 0, "out of memory");
1018 int pidfile_remove(char *path, int fd)
1021 return (unlink(path));
1024 int pidfile_write(int pid_fd)
1027 snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
1030 lseek(pid_fd, 0, SEEK_SET);
1031 int ret = write(pid_fd, buf, strlen(buf));
1035 int pidfile_read(char *path, pid_t *pid)
1037 char buf[16], *endptr;
1040 int fd = open(path, O_RDONLY);
1043 int ret = read(fd, buf, 15);
1049 *pid = strtol(buf, &endptr, 10);
1050 if (endptr != &buf[ret]){
1058 int pidfile_open(char *path, pid_t *old_pid)
1061 int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
1063 if (errno == -EEXIST)
1064 pidfile_read(path, old_pid);
1069 int main(int argc, char **argv)
1071 int pid_fd = -1, r = 0;
1073 struct pfiled pfiled;
1075 init_perr("pfiled");
1076 parse_cmdline(argc, argv);
1078 perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
1080 if (cmdline_pidfile){
1081 pid_fd = pidfile_open(cmdline_pidfile, &old_pid);
1084 perr(PFE, 0, "Daemon already running, pid: %d.", old_pid);
1086 perr(PFE, 0, "Cannot open or create pidfile");
1092 if (cmdline_daemon){
1093 if (daemon(0, 1) < 0){
1094 perr(PFE, 0, "Cannot daemonize");
1101 pidfile_write(pid_fd);
1104 if (pfiled_init(&pfiled) < 0){
1109 r = pfiled_loop(&pfiled);
1112 pidfile_remove(cmdline_pidfile, pid_fd);