fix missing map object creation in volume creation, plus map size, map error handling
[archipelago] / xseg / peers / user / pfiled.c
index 605f920..b47565a 100644 (file)
@@ -15,6 +15,7 @@
 #include <signal.h>
 #include <limits.h>
 #include <pthread.h>
+#include <syscall.h>
 #include <sys/sendfile.h>
 
 #include <xseg/xseg.h>
@@ -41,22 +42,25 @@ long cmdline_portno = -1;
 char *cmdline_xseg_spec = NULL;
 char *cmdline_path = NULL;
 char *cmdline_vpath = NULL;
+char *cmdline_pidfile = NULL;
+int  cmdline_daemon = 0;
 long cmdline_nr_ops = DEFAULT_NR_OPS;
 long cmdline_verbose = 0;
+volatile unsigned int terminated = 0;
 
 static int usage(char *argv0)
 {
        fprintf(stderr,
-               "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
-               "where:\n"
-               "\tPATH: path to pithos data blocks\n"
-               "\tVPATH: path to modified volume blocks\n"
-               "\tPORT: xseg port to listen for requests on\n"
-               "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
+                       "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
+                       "where:\n"
+                       "\tPATH: path to pithos data blocks\n"
+                       "\tVPATH: path to modified volume blocks\n"
+                       "\tPORT: xseg port to listen for requests on\n"
+                       "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
                        "request_size:extra_size:page_shift'\n"
-               "\tNR_OPS: number of outstanding xseg requests\n"
-               "\t-v: verbose mode\n",
-               argv0);
+                       "\tNR_OPS: number of outstanding xseg requests\n"
+                       "\t-v: verbose mode\n",
+                       argv0);
 
        return 1;
 }
@@ -111,6 +115,45 @@ struct io {
 };
 
 
+static inline int isTerminate()
+{
+       /* ta doesn't need to be taken into account, because the main loops
+        * doesn't check the terminated flag if ta is not 0.
+        * 
+        * #ifdef ST_THREADS
+        * return (!ta & terminated);
+        * #else
+        * return terminated;
+        *  #endif
+        */
+       return terminated;
+}
+
+void signal_handler(int signal)
+{      
+       terminated = 1;
+}
+
+static int setup_signals()
+{      
+       int r;
+       struct sigaction sa;
+       sigemptyset(&sa.sa_mask);
+       sa.sa_flags = 0;
+       sa.sa_handler = signal_handler;
+       r = sigaction(SIGTERM, &sa, NULL);
+       if (r < 0)
+               return r;
+       r = sigaction(SIGINT, &sa, NULL);
+       if (r < 0)
+               return r;
+       r = sigaction(SIGQUIT, &sa, NULL);
+       if (r < 0)
+               return r;
+       return r;
+}
+
+
 static unsigned long sigaction_count;
 
 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
@@ -135,18 +178,18 @@ static void log_io(char *msg, struct io *io)
        data[dend] = 0;
 
        fprintf(stderr,
-               "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %u\n"
-               "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
-               msg,
-               (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
-               (unsigned int)io->req->op,
-               (unsigned long long)io->req->offset,
-               (unsigned long)io->req->size,
-               (unsigned long)io->retval,
-               (unsigned int)io->req->state,
-               (unsigned long)io->req->serviced,
-               (unsigned int)io->req->targetlen, target,
-               (unsigned long long)io->req->datalen, data);
+                       "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %u\n"
+                       "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
+                       msg,
+                       (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
+                       (unsigned int)io->req->op,
+                       (unsigned long long)io->req->offset,
+                       (unsigned long)io->req->size,
+                       (unsigned long)io->retval,
+                       (unsigned int)io->req->state,
+                       (unsigned long)io->req->serviced,
+                       (unsigned int)io->req->targetlen, target,
+                       (unsigned long long)io->req->datalen, data);
 }
 
 static struct io *alloc_io(struct pfiled *pfiled)
@@ -225,7 +268,7 @@ static int create_path(char *buf, char *path, char *target, uint32_t targetlen,
 }
 
 static int dir_open(struct pfiled *pfiled, struct io *io,
-                       char *target, uint32_t targetlen, int mode)
+               char *target, uint32_t targetlen, int mode)
 {
        int fd = -1;
        struct fdcache_node *ce = NULL;
@@ -310,7 +353,7 @@ start_locked:
                fd = -1;
                goto new_entry;
        }
-       
+
        fd = open(tmp, O_RDWR);
        if (fd < 0) {
                /* try opening it from the tmp dir */
@@ -323,7 +366,7 @@ start_locked:
                                fd = -1;
                                goto new_entry;
                        }
-       
+
                        fd = open(tmp, O_RDWR | O_CREAT, 0600);         
                        if (fd < 0)
                                perror(tmp);
@@ -392,54 +435,54 @@ static void handle_read_write(struct pfiled *pfiled, struct io *io)
        }
 
        switch (req->op) {
-       case X_READ:
-               while (req->serviced < req->datalen) {
-                       r = pread(fd, data + req->serviced, 
-                                       req->datalen - req->serviced,
-                                       req->offset + req->serviced);
-                       if (r < 0) {
-                               req->datalen = req->serviced;
-                               perror("pread");
-                       }
-                       else if (r == 0) {
-                               /* reached end of file. zero out the rest data buffer */
-                               memset(data + req->serviced, 0, req->datalen - req->serviced);
-                               req->serviced = req->datalen;
-                       }
-                       else {
-                               req->serviced += r;
-                       }
-               }
-               break;
-       case X_WRITE:
-               while (req->serviced < req->datalen) {
-                       r = pwrite(fd, data + req->serviced, 
-                                       req->datalen - req->serviced,
-                                       req->offset + req->serviced);
-                       if (r < 0) {
-                               req->datalen = req->serviced;
+               case X_READ:
+                       while (req->serviced < req->datalen) {
+                               r = pread(fd, data + req->serviced, 
+                                               req->datalen - req->serviced,
+                                               req->offset + req->serviced);
+                               if (r < 0) {
+                                       req->datalen = req->serviced;
+                                       perror("pread");
+                               }
+                               else if (r == 0) {
+                                       /* reached end of file. zero out the rest data buffer */
+                                       memset(data + req->serviced, 0, req->datalen - req->serviced);
+                                       req->serviced = req->datalen;
+                               }
+                               else {
+                                       req->serviced += r;
+                               }
                        }
-                       else if (r == 0) {
-                               fprintf(stderr, "write returned 0\n");
-                               memset(data + req->serviced, 0, req->datalen - req->serviced);
-                               req->serviced = req->datalen;
+                       break;
+               case X_WRITE:
+                       while (req->serviced < req->datalen) {
+                               r = pwrite(fd, data + req->serviced, 
+                                               req->datalen - req->serviced,
+                                               req->offset + req->serviced);
+                               if (r < 0) {
+                                       req->datalen = req->serviced;
+                               }
+                               else if (r == 0) {
+                                       fprintf(stderr, "write returned 0\n");
+                                       memset(data + req->serviced, 0, req->datalen - req->serviced);
+                                       req->serviced = req->datalen;
+                               }
+                               else {
+                                       req->serviced += r;
+                               }
                        }
-                       else {
-                               req->serviced += r;
+                       r = fsync(fd);
+                       if (r< 0) {
+                               perror("fsync");
+                               /* if fsync fails, then no bytes serviced correctly */
+                               req->serviced = 0;
                        }
-               }
-               r = fsync(fd);
-               if (r< 0) {
-                       perror("fsync");
-                       /* if fsync fails, then no bytes serviced correctly */
-                       req->serviced = 0;
-               }
-               break;
-       default:
-               snprintf(data, req->datalen,
-                        "wtf, corrupt op %u?\n", req->op);
-               fail(pfiled, io);
-               return;
+                       break;
+               default:
+                       snprintf(data, req->datalen,
+                                       "wtf, corrupt op %u?\n", req->op);
+                       fail(pfiled, io);
+                       return;
        }
 
        if (req->serviced > 0 ) {
@@ -538,7 +581,7 @@ static void handle_delete(struct pfiled *pfiled, struct io *io)
        char *buf = malloc(255);
        int fd;
        char *target = xseg_get_target(pfiled->xseg, req);
-       
+
        fd = dir_open(pfiled, io, target, req->targetlen, 0);
        if (fd < 0) {
                fprintf(stderr, "fail in dir_open\n");
@@ -587,7 +630,7 @@ static void handle_open(struct pfiled *pfiled, struct io *io)
        fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
        if (fd < 0)
                goto out_fail;
-       
+
        close(fd);
        free(buf);
        free(pathname);
@@ -636,27 +679,27 @@ static void dispatch(struct pfiled *pfiled, struct io *io)
 {
        if (cmdline_verbose) { 
                fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
-                       (void *)io, (void *)io->req, io->req->op);
+                               (void *)io, (void *)io->req, io->req->op);
        }
 
        switch (io->req->op) {
-       case X_READ:
-       case X_WRITE:
-               handle_read_write(pfiled, io); break;
-       case X_INFO:
-               handle_info(pfiled, io); break;
-       case X_COPY:
-               handle_copy(pfiled, io); break;
-       case X_DELETE:
-               handle_delete(pfiled, io); break;
-       case X_OPEN:
-               handle_open(pfiled, io); break;
-       case X_CLOSE:
-               handle_close(pfiled, io); break;
-//     case X_SNAPSHOT:
-       case X_SYNC:
-       default:
-               handle_unknown(pfiled, io);
+               case X_READ:
+               case X_WRITE:
+                       handle_read_write(pfiled, io); break;
+               case X_INFO:
+                       handle_info(pfiled, io); break;
+               case X_COPY:
+                       handle_copy(pfiled, io); break;
+               case X_DELETE:
+                       handle_delete(pfiled, io); break;
+               case X_OPEN:
+                       handle_open(pfiled, io); break;
+               case X_CLOSE:
+                       handle_close(pfiled, io); break;
+                       //      case X_SNAPSHOT:
+               case X_SYNC:
+               default:
+                       handle_unknown(pfiled, io);
        }
 }
 
@@ -691,7 +734,8 @@ void *io_loop(void *arg)
 
        for (;;) {
                accepted = NULL;
-               accepted = xseg_accept(xseg, portno, 0);
+               if (!isTerminate())
+                       accepted = xseg_accept(xseg, portno, 0);
                if (accepted) {
                        io->req = accepted;
                        wake_up_next_iothread(pfiled);
@@ -729,7 +773,7 @@ static int pfiled_loop(struct pfiled *pfiled)
        /* GCC + pthreads glitch? */
        struct io *io;
 
-       for (;;) {
+       for (;!(isTerminate() && xq_count(&pfiled->free_ops) == pfiled->nr_ops);) {
                io = wake_up_next_iothread(pfiled);
                xseg_prepare_wait(xseg, portno);
                xseg_wait_signal(xseg, 1000000UL);
@@ -772,7 +816,7 @@ static int pfiled_init(struct pfiled *pfiled)
                perr(PE, 0, "could not allocate memory [fdcache]");
                goto out;
        }
-               
+
 
        pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
        if(!pfiled->free_bufs) {
@@ -802,8 +846,8 @@ static int pfiled_init(struct pfiled *pfiled)
        }
 
        xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
-                               pfiled->free_bufs);
-       
+                       pfiled->free_bufs);
+
        pfiled->handled_reqs = 0;
 
        strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
@@ -834,7 +878,7 @@ static int pfiled_init(struct pfiled *pfiled)
        if (!pfiled->xseg) {
                ret = -EIO;
                perr(PE, 0, "could not join xseg with spec '%s'\n", 
-                       cmdline_xseg_spec);
+                               cmdline_xseg_spec);
                goto out_with_xseginit;
        }
 
@@ -847,10 +891,10 @@ static int pfiled_init(struct pfiled *pfiled)
 
        pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
        perr(PI, 0, "filed on port %u/%u\n",
-               pfiled->portno, pfiled->xseg->config.nr_ports);
+                       pfiled->portno, pfiled->xseg->config.nr_ports);
 
        if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
-               printf("cannot int local signals\n");
+               perr(PE, 0, "cannot int local signals\n");
                return -1;
        }
 
@@ -897,17 +941,17 @@ static void parse_cmdline(int argc, char **argv)
                int c;
 
                opterr = 0;
-               c = getopt(argc, argv, "hp:n:g:v");
+               c = getopt(argc, argv, "dhp:n:g:vf:");
                if (c == -1)
                        break;
-               
+
                switch(c) {
                        case '?':
                                perr(PFE, 0, "Unknown option: -%c", optopt);
                                break;
                        case ':':
                                perr(PFE, 0, "Option -%c requires an argument",
-                                       optopt);
+                                               optopt);
                                break;
                        case 'h':
                                usage(argv0);
@@ -924,10 +968,19 @@ static void parse_cmdline(int argc, char **argv)
                                cmdline_xseg_spec = strdup(optarg);
                                if (!cmdline_xseg_spec)
                                        perr(PFE, 0, "out of memory");
-                                break;
+                               break;
                        case 'v':
                                cmdline_verbose = 1;
                                break;
+                       case 'd':
+                               cmdline_daemon = 1;
+                               break;
+                       case 'f':
+                               /* FIXME: Max length of spec? strdup, eww */
+                               cmdline_pidfile = strdup(optarg);
+                               if (!cmdline_pidfile)
+                                       perr(PFE, 0, "out of memory");
+                               break;
                }
        }
 
@@ -962,17 +1015,101 @@ static void parse_cmdline(int argc, char **argv)
                perr(PFE, 0, "out of memory");
 }
 
+int pidfile_remove(char *path, int fd)
+{       
+       close(fd);
+       return (unlink(path));
+}
+
+int pidfile_write(int pid_fd)
+{       
+       char buf[16];
+       snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
+       buf[15] = 0;
+
+       lseek(pid_fd, 0, SEEK_SET);
+       int ret = write(pid_fd, buf, strlen(buf));
+       return ret;
+}
+
+int pidfile_read(char *path, pid_t *pid)
+{       
+       char buf[16], *endptr;
+       *pid = 0;
+
+       int fd = open(path, O_RDONLY);
+       if (fd < 0)
+               return -1;
+       int ret = read(fd, buf, 15);
+       buf[15]=0;
+       close(fd);
+       if (ret < 0)
+               return -1;
+       else{   
+               *pid = strtol(buf, &endptr, 10);
+               if (endptr != &buf[ret]){
+                       *pid = 0;
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+int pidfile_open(char *path, pid_t *old_pid)
+{       
+       //nfs version > 3
+       int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
+       if (fd < 0){
+               if (errno == -EEXIST)
+                       pidfile_read(path, old_pid);
+       }
+       return fd;
+}                
+
 int main(int argc, char **argv)
 {
+       int pid_fd = -1, r = 0;
+       pid_t old_pid;
        struct pfiled pfiled;
 
        init_perr("pfiled");
        parse_cmdline(argc, argv);
 
        perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
+       
+       if (cmdline_pidfile){
+               pid_fd = pidfile_open(cmdline_pidfile, &old_pid);
+               if (pid_fd < 0) {
+                       if (old_pid) {
+                               perr(PFE, 0, "Daemon already running, pid: %d.", old_pid);
+                       } else {
+                               perr(PFE, 0, "Cannot open or create pidfile");
+                       }
+                       return -1;
+               }
+       }
+
+       if (cmdline_daemon){
+               if (daemon(0, 1) < 0){
+                       perr(PFE, 0, "Cannot daemonize");
+                       r = -1;
+                       goto out;
+               }
+       }
+       setup_signals();
+       if (pid_fd > 0)
+               pidfile_write(pid_fd);
 
-       if (pfiled_init(&pfiled) < 0)
-               perr(PFE, 0, "failed to initialize pfiled");
 
-       return pfiled_loop(&pfiled);
+       if (pfiled_init(&pfiled) < 0){
+               r = -1;
+               goto out;
+       }
+
+       r = pfiled_loop(&pfiled);
+out:
+       if (pid_fd > 0)
+               pidfile_remove(cmdline_pidfile, pid_fd);
+       return r;
+
 }