add pfiled based on peer skeleton
authorGiannakos Filippos <philipgian@grnet.gr>
Fri, 2 Nov 2012 16:41:37 +0000 (18:41 +0200)
committerGiannakos Filippos <philipgian@grnet.gr>
Fri, 2 Nov 2012 17:30:25 +0000 (19:30 +0200)
xseg/peers/user/mt-pfiled.c [new file with mode: 0644]
xseg/peers/user/peer.c

diff --git a/xseg/peers/user/mt-pfiled.c b/xseg/peers/user/mt-pfiled.c
new file mode 100644 (file)
index 0000000..f2cbebb
--- /dev/null
@@ -0,0 +1,724 @@
+/*
+ * The Pithos File Blocker Peer (pfiled)
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <string.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <aio.h>
+#include <signal.h>
+#include <limits.h>
+#include <pthread.h>
+#include <syscall.h>
+#include <sys/sendfile.h>
+#include <peer.h>
+
+#include <xseg/xseg.h>
+#include <xseg/protocol.h>
+
+#define LOCK_SUFFIX            "_lock"
+#define MAX_PATH_SIZE          1024
+#define MAX_FILENAME_SIZE      (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
+
+/* default concurrency level (number of threads) */
+#define DEFAULT_NR_OPS          16
+
+/* Pithos hash for the zero block
+ * FIXME: Should it be hardcoded?
+ */
+#define ZERO_BLOCK \
+       "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
+
+/*
+ * Globals, holding command-line arguments
+ */
+
+void usage(char *argv0)
+{
+       fprintf(stderr,
+                       "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
+                       "where:\n"
+                       "\tPATH: path to pithos data blocks\n"
+                       "\tVPATH: path to modified volume blocks\n"
+                       "\tPORT: xseg port to listen for requests on\n"
+                       "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
+                       "request_size:extra_size:page_shift'\n"
+                       "\tNR_OPS: number of outstanding xseg requests\n"
+                       "\t-v: verbose mode\n",
+                       argv0);
+
+}
+
+/* fdcache_node flags */
+#define READY (1 << 1)
+
+/* fdcache node info */
+struct fdcache_node {
+       volatile int fd;
+       volatile unsigned int ref;
+       volatile unsigned long time;
+       volatile unsigned int flags;
+       pthread_cond_t cond;
+       char target[MAX_FILENAME_SIZE + 1];
+};
+
+/* pfiled context */
+struct pfiled {
+       uint32_t path_len;
+       uint32_t vpath_len;
+       uint64_t handled_reqs;
+       long maxfds;
+       struct fdcache_node *fdcache;
+       pthread_mutex_t cache_lock;
+       char path[MAX_PATH_SIZE + 1];
+       char vpath[MAX_PATH_SIZE + 1];
+};
+
+/*
+ * pfiled specific structure 
+ * containing information on a pending I/O operation
+ */
+struct fio {
+       uint32_t state;
+       long fdcacheidx;
+};
+
+struct pfiled * __get_pfiled(struct peerd *peer)
+{
+       return (struct pfiled *) peer->priv;
+}
+
+struct fio * __get_fio(struct peer_req *pr)
+{
+       return (struct fio*) pr->priv;
+}
+
+static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
+{
+       struct pfiled *pfiled = __get_pfiled(peer);
+       struct fio *fio = __get_fio(pr);
+       int fd = -1;
+       if (fio->fdcacheidx >= 0) {
+               if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
+                       pthread_mutex_lock(&pfiled->cache_lock);
+                       if (!pfiled->fdcache[fio->fdcacheidx].ref){
+                               /* invalidate cache entry */
+                               fd = pfiled->fdcache[fio->fdcacheidx].fd;
+                               pfiled->fdcache[fio->fdcacheidx].fd = -1;
+                               pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
+                               pfiled->fdcache[fio->fdcacheidx].flags |= READY;
+                       }
+                       pthread_mutex_unlock(&pfiled->cache_lock);
+                       if (fd > 0)
+                               close(fd);
+
+               }
+       }
+}
+
+static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
+{
+       close_cache_entry(peer, pr);
+       complete(peer, pr);
+}
+
+static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
+{
+       close_cache_entry(peer, pr);
+       fail(peer, pr);
+}
+
+static void handle_unknown(struct peerd *peer, struct peer_req *pr)
+{
+       XSEGLOG2(&lc, W, "unknown request op");
+       pfiled_fail(peer, pr);
+}
+
+static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
+{
+       int i;
+       struct stat st;
+       uint32_t pathlen = strlen(path);
+
+       strncpy(buf, path, pathlen);
+
+       for (i = 0; i < 9; i+= 3) {
+               buf[pathlen + i] = target[i - (i/3)];
+               buf[pathlen + i +1] = target[i + 1 - (i/3)];
+               buf[pathlen + i + 2] = '/';
+               if (mkdirs == 1) {
+                       buf[pathlen + i + 3] = '\0';
+retry:
+                       if (stat(buf, &st) < 0) 
+                               if (mkdir(buf, 0700) < 0) {
+                                       if (errno == EEXIST)
+                                               goto retry;
+                                       perror(buf);
+                                       return errno;
+                               }
+               }
+       }
+
+       strncpy(&buf[pathlen + 9], target, targetlen);
+       buf[pathlen + 9 + targetlen] = '\0';
+
+       return 0;
+}
+
+static int dir_open(struct pfiled *pfiled, struct fio *io,
+               char *target, uint32_t targetlen, int mode)
+{
+       int fd = -1;
+       struct fdcache_node *ce = NULL;
+       long i, lru;
+       char tmp[pfiled->path_len + targetlen + 10];
+       uint64_t min;
+       io->fdcacheidx = -1;
+       if (targetlen> MAX_FILENAME_SIZE)
+               goto out_err;
+
+start:
+       /* check cache */
+       pthread_mutex_lock(&pfiled->cache_lock);
+start_locked:
+       lru = -1;
+       min = UINT64_MAX;
+       for (i = 0; i < pfiled->maxfds; i++) {
+               if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
+                               && (pfiled->fdcache[i].flags & READY)) {
+                       min = pfiled->fdcache[i].time;
+                       lru = i;
+
+               }
+
+               if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
+                       if (pfiled->fdcache[i].target[targetlen] == 0) {
+                               ce = &pfiled->fdcache[i];
+                               /* if any other io thread is currently opening
+                                * the file, block until it succeeds or fails
+                                */
+                               if (!(ce->flags & READY)) {
+                                       pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
+                                       /* when ready, restart lookup */
+                                       goto start_locked;
+                               }
+                               /* if successfully opened */
+                               if (ce->fd > 0) {
+                                       fd = pfiled->fdcache[i].fd;
+                                       io->fdcacheidx = i;
+                                       goto out;
+                               }
+                               /* else open failed for the other io thread, so
+                                * it should fail for everyone waiting on this
+                                * file.
+                                */
+                               else {
+                                       fd = -1;
+                                       io->fdcacheidx = -1;
+                                       goto out_err_unlock;
+                               }
+                       }
+               }
+       }
+       if (lru < 0){
+               /* all cache entries are currently being used */
+               pthread_mutex_unlock(&pfiled->cache_lock);
+               goto start;
+       }
+       if (pfiled->fdcache[lru].ref){
+               fd = -1;
+               XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
+               goto out_err_unlock;
+       }
+       /* make room for new file */
+       ce = &pfiled->fdcache[lru];
+       /* set name here and state to not ready, for any other requests on the
+        * same target that may follow
+        */
+       strncpy(ce->target, target, targetlen);
+       ce->target[targetlen] = 0;
+       ce->flags &= ~READY;
+       pthread_mutex_unlock(&pfiled->cache_lock);
+
+       if (ce->fd >0){
+               if (close(ce->fd) < 0){
+                       XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
+               }
+       }
+
+       /* try opening it from pithos blocker dir */
+       if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
+               fd = -1;
+               goto new_entry;
+       }
+
+       fd = open(tmp, O_RDWR);
+       if (fd < 0) {
+               /* try opening it from the tmp dir */
+               if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
+                       goto new_entry;
+
+               fd = open(tmp, O_RDWR);
+               if (fd < 0)  {
+                       if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
+                               fd = -1;
+                               goto new_entry;
+                       }
+
+                       fd = open(tmp, O_RDWR | O_CREAT, 0600);         
+                       if (fd < 0)
+                               XSEGLOG2(&lc, E, "Cannot open %s", tmp);
+               }
+       }
+
+       /* insert in cache a negative fd to indicate opening error to
+        * any other ios waiting for the file to open
+        */
+
+       /* insert in cache */
+new_entry:
+       pthread_mutex_lock(&pfiled->cache_lock);
+       ce->fd = fd;
+       ce->ref = 0;
+       ce->flags = READY;
+       pthread_cond_broadcast(&ce->cond);
+       if (fd > 0) {
+               io->fdcacheidx = lru;
+       }
+       else {
+               io->fdcacheidx = -1;
+               goto out_err_unlock;
+       }
+
+out:
+       pfiled->handled_reqs++;
+       ce->time = pfiled->handled_reqs;
+       __sync_fetch_and_add(&ce->ref, 1);
+       pthread_mutex_unlock(&pfiled->cache_lock);
+out_err:
+       return fd;
+
+out_err_unlock:
+       pthread_mutex_unlock(&pfiled->cache_lock);
+       goto out_err;
+}
+
+static void handle_read_write(struct peerd *peer, struct peer_req *pr)
+{
+       struct pfiled *pfiled = __get_pfiled(peer);
+       struct fio *fio = __get_fio(pr);
+       struct xseg_request *req = pr->req;
+       int r, fd;
+       char *target = xseg_get_target(peer->xseg, req);
+       char *data = xseg_get_data(peer->xseg, req);
+
+       fd = dir_open(pfiled, fio, target, req->targetlen, 0);
+       if (fd < 0){
+               XSEGLOG2(&lc, E, "Dir open failed");
+               pfiled_fail(peer, pr);
+               return;
+       }
+
+       if (!req->size) {
+               if (req->flags & (XF_FLUSH | XF_FUA)) {
+                       /* No FLUSH/FUA support yet (O_SYNC ?).
+                        * note that with FLUSH/size == 0 
+                        * there will probably be a (uint64_t)-1 offset */
+                       pfiled_complete(peer, pr);
+                       return;
+               } else {
+                       pfiled_complete(peer, pr);
+                       return;
+               }
+       }
+
+       switch (req->op) {
+               case X_READ:
+                       while (req->serviced < req->datalen) {
+                               r = pread(fd, data + req->serviced, 
+                                               req->datalen - req->serviced,
+                                               req->offset + req->serviced);
+                               if (r < 0) {
+                                       req->datalen = req->serviced;
+                                       XSEGLOG2(&lc, E, "Cannot read");
+                               }
+                               else if (r == 0) {
+                                       /* reached end of file. zero out the rest data buffer */
+                                       memset(data + req->serviced, 0, req->datalen - req->serviced);
+                                       req->serviced = req->datalen;
+                               }
+                               else {
+                                       req->serviced += r;
+                               }
+                       }
+                       break;
+               case X_WRITE:
+                       while (req->serviced < req->datalen) {
+                               r = pwrite(fd, data + req->serviced, 
+                                               req->datalen - req->serviced,
+                                               req->offset + req->serviced);
+                               if (r < 0) {
+                                       req->datalen = req->serviced;
+                               }
+                               else {
+                                       req->serviced += r;
+                               }
+                       }
+                       r = fsync(fd);
+                       if (r< 0) {
+                               XSEGLOG2(&lc, E, "Fsync failed.");
+                               /* if fsync fails, then no bytes serviced correctly */
+                               req->serviced = 0;
+                       }
+                       break;
+               default:
+                       XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
+                       pfiled_fail(peer, pr);
+                       return;
+       }
+
+       if (req->serviced > 0 ) {
+               pfiled_complete(peer, pr);
+       }
+       else {
+               pfiled_fail(peer, pr);
+       }
+       return;
+}
+
+static void handle_info(struct peerd *peer, struct peer_req *pr)
+{
+       struct pfiled *pfiled = __get_pfiled(peer);
+       struct fio *fio = __get_fio(pr);
+       struct xseg_request *req = pr->req;
+       struct stat stat;
+       int fd, r;
+       uint64_t size;
+       char *target = xseg_get_target(peer->xseg, req);
+       char *data = xseg_get_data(peer->xseg, req);
+       struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
+
+       fd = dir_open(pfiled, fio, target, req->targetlen, 0);
+       if (fd < 0) {
+               XSEGLOG2(&lc, E, "Dir open failed");
+               pfiled_fail(peer, pr);
+               return;
+       }
+
+       r = fstat(fd, &stat);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "fail in stat");
+               pfiled_fail(peer, pr);
+               return;
+       }
+
+       size = (uint64_t)stat.st_size;
+       xinfo->size = size;
+
+       pfiled_complete(peer, pr);
+}
+
+static void handle_copy(struct peerd *peer, struct peer_req *pr)
+{
+       struct pfiled *pfiled = __get_pfiled(peer);
+       struct fio *fio = __get_fio(pr);
+       struct xseg_request *req = pr->req;
+       
+       char *target = xseg_get_target(peer->xseg, req);
+       char *data = xseg_get_data(peer->xseg, req);
+       struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
+       struct stat st;
+       char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+       int n, src = -1, dst = -1, r = -1;
+
+       if (!buf){
+               XSEGLOG2(&lc, E, "Out of memory");
+               pfiled_fail(peer, pr);
+               return;
+       }
+
+       dst = dir_open(pfiled, fio, target, req->targetlen, 1);
+       if (dst < 0) {
+               XSEGLOG2(&lc, E, "Fail in dst");
+               r = dst;
+               goto out;
+       }
+
+       if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0)  {
+               XSEGLOG2(&lc, E, "Create path failed");
+               r = -1;
+               goto out;
+       }
+
+       src = open(buf, O_RDWR);
+       if (src < 0) {
+               XSEGLOG2(&lc, E, "fail in src %s", buf);
+               r = src;
+               goto out;
+       }
+
+       r = fstat(src, &st);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
+               goto out;
+       }
+
+       n = sendfile(dst, src, 0, st.st_size);
+       if (n != st.st_size) {
+               XSEGLOG2(&lc, E, "Copy failed for %s", buf);
+               r = -1;
+               goto out;
+       }
+       r = 0;
+
+out:
+       if (src > 0)
+               close(src);
+       free(buf);
+       if (r < 0)
+               pfiled_fail(peer, pr);
+       else
+               pfiled_complete(peer, pr);
+       return;
+}
+
+static void handle_delete(struct peerd *peer, struct peer_req *pr)
+{
+       struct pfiled *pfiled = __get_pfiled(peer);
+       struct fio *fio = __get_fio(pr);
+       struct xseg_request *req = pr->req;
+       
+       char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+       int fd, r;
+       char *target = xseg_get_target(peer->xseg, req);
+       if (!buf){
+               XSEGLOG2(&lc, E, "Out of memory");
+               pfiled_fail(peer, pr);
+               return;
+       }
+       fd = dir_open(pfiled, fio, target, req->targetlen, 0);
+       if (fd < 0) {
+               XSEGLOG2(&lc, E, "Dir open failed");
+               r = fd;
+               goto out;
+       }
+
+       /* mark cache entry as invalid 
+        * give a chance to pending operations on this file to end.
+        * file will close when all operations are done 
+        */
+       if (fio->fdcacheidx >= 0) {
+               pthread_mutex_lock(&pfiled->cache_lock);
+               pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
+               pthread_mutex_unlock(&pfiled->cache_lock);
+       }
+
+       r = create_path(buf, pfiled->vpath, target, req->targetlen, 0);
+       if (r< 0) {
+               XSEGLOG2(&lc, E, "Create path failed");
+               goto out;
+       }
+       r = unlink(buf);
+out:
+       free(buf);
+       if (r < 0)
+               pfiled_fail(peer, pr);
+       else
+               pfiled_complete(peer, pr);
+       return;
+}
+
+static void handle_open(struct peerd *peer, struct peer_req *pr)
+{
+       struct pfiled *pfiled = __get_pfiled(peer);
+//     struct fio *fio = __get_fio(pr);
+       struct xseg_request *req = pr->req;
+       char *buf = malloc(MAX_FILENAME_SIZE);
+       char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+       int fd = -1;
+       char *target = xseg_get_target(peer->xseg, req);
+
+       if (!buf || !pathname) {
+               XSEGLOG2(&lc, E, "Out of memory");
+               pfiled_fail(peer, pr);
+               return;
+       }
+
+       strncpy(buf, target, req->targetlen);
+       strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
+
+       if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 1) < 0) {
+               XSEGLOG2(&lc, E, "Create path failed for %s", buf);
+               goto out;
+       }
+
+       //nfs v >= 3
+       fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
+       if (fd < 0){
+               //actual error
+               if (errno != -EEXIST)
+                       XSEGLOG2(&lc, W, "Error opening %s", pathname);
+               goto out;
+       }
+       close(fd);
+out:
+       free(buf);
+       free(pathname);
+       if (fd < 0)
+               pfiled_fail(peer, pr);
+       else
+               pfiled_complete(peer, pr);
+       return;
+}
+
+static void handle_close(struct peerd *peer, struct peer_req *pr)
+{
+       struct pfiled *pfiled = __get_pfiled(peer);
+//     struct fio *fio = __get_fio(pr);
+       struct xseg_request *req = pr->req;
+       char *buf = malloc(MAX_FILENAME_SIZE);
+       char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+       char *target = xseg_get_target(peer->xseg, req);
+       int r;
+
+       if (!buf || !pathname) {
+               XSEGLOG2(&lc, E, "Out of memory");
+               fail(peer, pr);
+               return;
+       }
+
+       strncpy(buf, target, req->targetlen);
+       strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
+
+       r = create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 0);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Create path failed for %s", buf);
+               goto out;
+       }
+       r = unlink(pathname);
+
+out:
+       free(buf);
+       free(pathname);
+       if (r < 0)
+               fail(peer, pr);
+       else
+               complete(peer, pr);
+       return;
+}
+
+int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
+                               enum dispatch_reason reason)
+{
+       struct fio *fio = __get_fio(pr);
+       if (reason == dispatch_accept){
+               fio->fdcacheidx = -1;
+               fio->state = XS_ACCEPTED;
+       }
+       
+       switch (req->op) {
+               case X_READ:
+               case X_WRITE:
+                       handle_read_write(peer, pr); break;
+               case X_INFO:
+                       handle_info(peer, pr); break;
+               case X_COPY:
+                       handle_copy(peer, pr); break;
+               case X_DELETE:
+                       handle_delete(peer, pr); break;
+               case X_OPEN:
+                       handle_open(peer, pr); break;
+               case X_CLOSE:
+                       handle_close(peer, pr); break;
+                       //      case X_SNAPSHOT:
+               case X_SYNC:
+               default:
+                       handle_unknown(peer, pr);
+       }
+       return 0;
+}
+
+int custom_peer_init(struct peerd *peer, int argc, char *argv[])
+{
+       int ret = 0;
+       int i;
+       struct pfiled *pfiled = malloc(sizeof(struct pfiled));
+       if (!pfiled){
+               XSEGLOG2(&lc, E, "Out of memory");
+               ret = -ENOMEM;
+               goto out;
+       }
+       peer->priv = pfiled;
+
+       pfiled->maxfds = 2 * peer->nr_ops;
+       pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
+       if(!pfiled->fdcache) {
+               XSEGLOG2(&lc, E, "Out of memory");
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       for (i = 0; i < peer->nr_ops; i++) {
+               peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
+               if (!peer->peer_reqs->priv){
+                       XSEGLOG2(&lc, E, "Out of memory");
+                       ret = -ENOMEM;
+                       goto out;
+               }
+       }
+
+       pfiled->handled_reqs = 0;
+       for (i = 0; i < argc; i++) {
+               if (!strcmp(argv[i], "--path") && (i+1) < argc){
+                       strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
+                       pfiled->path[MAX_PATH_SIZE] = 0;
+                       i += 1;
+                       continue;
+               }
+               if (!strcmp(argv[i], "--vpath") && (i+1) < argc){
+                       strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
+                       pfiled->vpath[MAX_PATH_SIZE] = 0;
+                       i += 1;
+                       continue;
+               }
+       }
+
+       pfiled->path_len = strlen(pfiled->path);
+       if (pfiled->path[pfiled->path_len -1] != '/'){
+               pfiled->path[pfiled->path_len] = '/';
+               pfiled->path[++pfiled->path_len]= 0;
+       }
+
+       pfiled->vpath_len = strlen(pfiled->vpath);
+       if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
+               pfiled->vpath[pfiled->vpath_len] = '/';
+               pfiled->vpath[++pfiled->vpath_len]= 0;
+       }
+
+       for (i = 0; i < peer->nr_ops; i++) {
+               pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
+               pfiled->fdcache[i].flags = READY;
+       }
+       pthread_mutex_init(&pfiled->cache_lock, NULL);
+
+out:
+       return ret;
+}
+
+static int safe_atoi(char *s)
+{
+       long l;
+       char *endp;
+
+       l = strtol(s, &endp, 10);
+       if (s != endp && *endp == '\0')
+               return l;
+       else
+               return -1;
+}
index 85e78ec..779beaf 100644 (file)
@@ -398,6 +398,7 @@ static void* thread_loop(void *arg)
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
        }
+       wake_up_next_thread(peer);
        return NULL;
 }