Add posixfd signaling
authorFilippos Giannakos <philipgian@grnet.gr>
Thu, 29 Aug 2013 10:04:40 +0000 (13:04 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Tue, 28 Jan 2014 15:02:45 +0000 (17:02 +0200)
21 files changed:
xseg/drivers/kernel/xseg_posix.c
xseg/drivers/kernel/xseg_pthread.c
xseg/drivers/kernel/xseg_segdev.c
xseg/drivers/user/Makefile
xseg/drivers/user/xseg_posix.c
xseg/drivers/user/xseg_posixfd.c [new file with mode: 0644]
xseg/drivers/user/xseg_pthread.c
xseg/drivers/user/xseg_segdev.c
xseg/drivers/xseg_posixfd.h [new file with mode: 0644]
xseg/peers/user/Makefile
xseg/peers/user/bench-xseg.c
xseg/peers/user/mapper.c
xseg/peers/user/peer.c
xseg/peers/user/peer.h
xseg/sys/user/Makefile
xseg/sys/user/python/Makefile
xseg/tools/archipelago/archipelago/common.py
xseg/tools/qa/tests.py
xseg/xseg/xseg.c
xseg/xseg/xseg.h
xseg/xseg/xseg_exports.h

index 379ccaa..cc130f4 100644 (file)
@@ -77,7 +77,7 @@ static int posix_cancel_wait(struct xseg *xseg, uint32_t portno)
        return -1;
 }
 
-static int posix_wait_signal(struct xseg *xseg, uint32_t timeout)
+static int posix_wait_signal(struct xseg *xseg, void *sd, uint32_t timeout)
 {
        return -1;
 }
index 395ecd8..37a7f81 100644 (file)
@@ -78,7 +78,7 @@ static int pthread_cancel_wait(struct xseg *xseg, uint32_t portno)
        return -1;
 }
 
-static int pthread_wait_signal(struct xseg *xseg, uint32_t timeout)
+static int pthread_wait_signal(struct xseg *xseg, void *sd, uint32_t timeout)
 {
        return -1;
 }
index 4d4abf7..a11cee4 100644 (file)
@@ -276,7 +276,7 @@ static int segdev_cancel_wait(struct xseg *xseg, uint32_t portno)
        return -0;
 }
 
-static int segdev_wait_signal(struct xseg *xseg, uint32_t timeout)
+static int segdev_wait_signal(struct xseg *xseg, void *sd, uint32_t timeout)
 {
        return -1;
 }
index c24ca3d..c82c3e6 100644 (file)
@@ -43,7 +43,7 @@ FILES+=$(shell ls *.c)
 SUBDIR:=$(subst $(XSEG_HOME),,$(CURDIR))
 
 
-DRIVERS=xseg_posix xseg_segdev xseg_pthread
+DRIVERS=xseg_posix xseg_segdev xseg_pthread xseg_posixfd
 DRVOBJS=$(DRIVERS:=.o)
 DRVSOS=$(DRIVERS:=.so)
 
@@ -72,6 +72,12 @@ xseg_pthread.o: xseg_pthread.c $(BASE)/xseg/xseg.h $(BASE)/drivers/xseg_pthread.
 xseg_pthread.so: xseg_pthread.o $(BASE)/sys/user/xseg_user.o
        $(CC) -shared -lpthread -o $@ $< $(BASE)/sys/user/xseg_user.o 
 
+xseg_posixfd.o: xseg_posixfd.c $(BASE)/xseg/xseg.h $(BASE)/drivers/xseg_posixfd.h
+       $(CC) $(CFLAGS) $(INC) -fPIC -c -o $@ $<
+
+xseg_posixfd.so: xseg_posixfd.o $(BASE)/sys/user/xseg_user.o
+       $(CC) -shared -o $@ $< $(BASE)/sys/user/xseg_user.o
+
 .PHONY: lib
 lib:
        cp -vaf $(DRVSOS) $(LIB)
index 796822f..8be3597 100644 (file)
@@ -195,7 +195,7 @@ static int posix_cancel_wait(struct xseg *xseg, uint32_t portno)
        return 0;
 }
 
-static int posix_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
+static int posix_wait_signal(struct xseg *xseg, void *sd, uint32_t usec_timeout)
 {
        int r;
        siginfo_t siginfo;
diff --git a/xseg/drivers/user/xseg_posixfd.c b/xseg/drivers/user/xseg_posixfd.c
new file mode 100644 (file)
index 0000000..fad0729
--- /dev/null
@@ -0,0 +1,493 @@
+/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/select.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
+#include <signal.h>
+#include <sys/util.h>
+#include <xseg/xseg.h>
+#include <xtypes/xobj.h>
+#include <drivers/xseg_posixfd.h>
+#define ERRSIZE 512
+char errbuf[ERRSIZE];
+
+static long posixfd_allocate(const char *name, uint64_t size)
+{
+       int fd, r;
+       fd = shm_open(name, O_RDWR | O_CREAT | O_EXCL, 0770);
+       if (fd < 0) {
+               XSEGLOG("Cannot create shared segment: %s\n",
+                       strerror_r(errno, errbuf, ERRSIZE));
+               return fd;
+       }
+
+       r = lseek(fd, size -1, SEEK_SET);
+       if (r < 0) {
+               close(fd);
+               XSEGLOG("Cannot seek into segment file: %s\n",
+                       strerror_r(errno, errbuf, ERRSIZE));
+               return r;
+       }
+
+       errbuf[0] = 0;
+       r = write(fd, errbuf, 1);
+       if (r != 1) {
+               close(fd);
+               XSEGLOG("Failed to set segment size: %s\n",
+                       strerror_r(errno, errbuf, ERRSIZE));
+               return r;
+       }
+
+       close(fd);
+       return 0;
+}
+
+static long posixfd_deallocate(const char *name)
+{
+       return shm_unlink(name);
+}
+
+static void *posixfd_map(const char *name, uint64_t size, struct xseg *seg)
+{
+       struct xseg *xseg;
+       int fd;
+
+       fd = shm_open(name, O_RDWR, 0000);
+       if (fd < 0) {
+               XSEGLOG("Failed to open '%s' for mapping: %s\n",
+                       name, strerror_r(errno, errbuf, ERRSIZE));
+               return NULL;
+       }
+
+       xseg = mmap (   XSEG_BASE_AS_PTR,
+                       size,
+                       PROT_READ | PROT_WRITE,
+                       MAP_SHARED | MAP_FIXED /* | MAP_LOCKED */,
+                       fd, 0   );
+
+       if (xseg == MAP_FAILED) {
+               XSEGLOG("Could not map segment: %s\n",
+                       strerror_r(errno, errbuf, ERRSIZE));
+               return NULL;
+       }
+
+       close(fd);
+       return xseg;
+}
+
+static void posixfd_unmap(void *ptr, uint64_t size)
+{
+       struct xseg *xseg = ptr;
+       (void)munmap(xseg, xseg->segment_size);
+}
+
+static struct posixfd_signal_desc * __get_signal_desc(struct xseg *xseg, xport portno)
+{
+       struct xseg_port *port = xseg_get_port(xseg, portno);
+       if (!port)
+               return NULL;
+       struct posixfd_signal_desc *psd = xseg_get_signal_desc(xseg, port);
+       if (!psd)
+               return NULL;
+       return psd;
+}
+
+static void __get_filename(struct posixfd_signal_desc *psd, char *filename)
+{
+       int pos = 0;
+       strncpy(filename+pos, POSIXFD_DIR, POSIXFD_DIR_LEN);
+       pos += POSIXFD_DIR_LEN;
+       strncpy(filename + pos, psd->signal_file, POSIXFD_FILENAME_LEN);
+       pos += POSIXFD_FILENAME_LEN;
+       filename[pos] = 0;
+}
+
+/*
+ * In order to be able to accept signals we must:
+ *
+ * a) Create the name piped for our signal descriptor.
+ * b) Open the named pipe and get an fd.
+ */
+static int posixfd_local_signal_init(struct xseg *xseg, xport portno)
+{
+       /* create or truncate POSIXFD+portno file */
+       int r, fd;
+       char filename[POSIXFD_DIR_LEN + POSIXFD_FILENAME_LEN + 1];
+
+       struct posixfd_signal_desc *psd = __get_signal_desc(xseg, portno);
+       if (!psd) {
+               return -1;
+       }
+       __get_filename(psd, filename);
+
+retry:
+       r = mkfifo(filename, S_IRUSR|S_IWUSR);
+       if (r < 0) {
+               if (errno == EEXIST) {
+                       unlink(filename);
+                       goto retry;
+               }
+               return -1;
+       }
+
+       fd = open(filename, O_RDONLY | O_NONBLOCK);
+       if (fd < 0) {
+               unlink(filename);
+               return -1;
+       }
+       psd->fd = fd;
+       open(filename, O_WRONLY | O_NONBLOCK);
+
+       return 0;
+}
+
+/*
+ * To clean up after our signal initialiazation, we should:
+ *
+ * a) close the open fd for our named pipe
+ * b) unlink the named pipe from the file system.
+ */
+static void posixfd_local_signal_quit(struct xseg *xseg, xport portno)
+{
+       char filename[POSIXFD_DIR_LEN + POSIXFD_FILENAME_LEN + 1];
+       struct posixfd_signal_desc *psd = __get_signal_desc(xseg, portno);
+       if (psd->fd >=0) {
+               close(psd->fd);
+               psd->fd = -1;
+       }
+       __get_filename(psd, filename);
+       unlink(filename);
+       return;
+}
+
+/*
+ * When this peer type is initialized, we must make sure the directory where the
+ * named pipes will be created, exist.
+ */
+static int posixfd_remote_signal_init(void)
+{
+       int r;
+       r = mkdir(POSIXFD_DIR, 01755);
+
+       if (r < 0) {
+               if (errno != EEXIST) // && isdir(POSIXFD_DIR)
+                       return -1;
+       }
+
+       return 0;
+}
+
+static void posixfd_remote_signal_quit(void)
+{
+       return;
+}
+
+static int posixfd_prepare_wait(struct xseg *xseg, uint32_t portno)
+{
+       char buf[512];
+       int buf_size = 512;
+       struct posixfd_signal_desc *psd = __get_signal_desc(xseg, portno);
+       if (!psd)
+               return -1;
+       psd->flag = 1;
+       while (read(psd->fd, buf, buf_size) > 0);
+
+       return 0;
+}
+
+static int posixfd_cancel_wait(struct xseg *xseg, uint32_t portno)
+{
+       char buf[512];
+       int buf_size = 512;
+       struct posixfd_signal_desc *psd = __get_signal_desc(xseg, portno);
+       if (!psd)
+               return -1;
+       psd->flag = 0;
+       while (read(psd->fd, buf, buf_size) > 0);
+
+       return 0;
+}
+
+/*
+ * To wait a signal, the posixfd peer must use select on the fd of its named
+ * pipe.
+ *
+ * When the peer wakes up from the select, if it wasn't waked up because of a
+ * timeout, it should read as much as it can from the named pipe to clean it and
+ * prepare it for the next select.
+ */
+static int posixfd_wait_signal(struct xseg *xseg, void *sd, uint32_t usec_timeout)
+{
+       int r;
+       struct timeval tv;
+       char buf[512];
+       int buf_size = 512;
+       fd_set fds;
+
+       struct posixfd_signal_desc *psd = (struct posixfd_signal_desc *)sd;
+       if (!psd)
+               return -1;
+
+       tv.tv_sec = usec_timeout / 1000000;
+       tv.tv_usec = usec_timeout - tv.tv_sec * 1000000;
+
+       FD_ZERO(&fds);
+       FD_SET(psd->fd, &fds);
+
+       r = select(psd->fd + 1, &fds, NULL, NULL, &tv);
+       //XSEGLOG("Tv sec: %ld, tv_usec: %ld", tv.tv_sec, tv.tv_usec);
+
+       if (r < 0) {
+               if (errno != EINTR) {
+                       return -1;
+               } else {
+                       return 0;
+               }
+       }
+
+       if (r != 0) {
+               /* clean up pipe */
+               while (read(psd->fd, buf, buf_size) > 0);
+       }
+
+       return 0;
+}
+
+/*
+ * To signal a posixfd peer, we must:
+ *
+ * a) Check if the peer wants to be signaled.
+ * b) Open the named pipe, it provides.
+ * c) Write some data to the named pipe, so the peer's fd will be selectable for
+ *    writing.
+ * d) Close the named pipe.
+ */
+static int posixfd_signal(struct xseg *xseg, uint32_t portno)
+{
+       int r, fd;
+       /* NULL terminated */
+       char filename[POSIXFD_DIR_LEN + POSIXFD_FILENAME_LEN + 1] = POSIXFD_DIR;
+
+       struct posixfd_signal_desc *psd = __get_signal_desc(xseg, portno);
+       if (!psd)
+               return -1;
+
+       if (!psd->flag) {
+               /* If the peer advises not to signal, we respect it. */
+               return 0;
+       }
+       __get_filename(psd, filename);
+
+       fd = open(filename, O_WRONLY|O_NONBLOCK);
+       if (fd < 0) {
+               return -1;
+       }
+       r = write(fd, "a", 1);
+       if (r < 0) {
+               close(fd);
+               return -1;
+       }
+       /* FIXME what here? */
+       r = close(fd);
+
+       return 0;
+}
+
+static void *posixfd_malloc(uint64_t size)
+{
+       return malloc((size_t)size);
+}
+
+static void *posixfd_realloc(void *mem, uint64_t size)
+{
+       return realloc(mem, (size_t)size);
+}
+
+static void posixfd_mfree(void *mem)
+{
+       free(mem);
+}
+
+/* taken from user/hash.c */
+static char get_hex(unsigned int h)
+{
+       switch (h)
+       {
+               case 0:
+               case 1:
+               case 2:
+               case 3:
+               case 4:
+               case 5:
+               case 6:
+               case 7:
+               case 8:
+               case 9:
+                       return h + '0';
+               case 10:
+                       return 'a';
+               case 11:
+                       return 'b';
+               case 12:
+                       return 'c';
+               case 13:
+                       return 'd';
+               case 14:
+                       return 'e';
+               case 15:
+                       return 'f';
+       }
+       /* not reachable */
+       return '0';
+}
+
+static void hexlify(unsigned char *data, long datalen, char *hex)
+{
+       long i;
+       for (i=0; i<datalen; i++){
+               hex[2*i] = get_hex((data[i] & 0xF0) >> 4);
+               hex[2*i + 1] = get_hex(data[i] & 0x0F);
+       }
+}
+
+
+
+int posixfd_init_signal_desc(struct xseg *xseg, void *sd)
+{
+       struct posixfd_signal_desc *psd = sd;
+       if (!psd)
+               return -1;
+       psd->flag = 0;
+       psd->signal_file[0] = 0;
+       hexlify(&sd, POSIXFD_FILENAME_LEN, psd->signal_file);
+       psd->fd = -1;
+
+       return 0;
+}
+
+void posixfd_quit_signal_desc(struct xseg *xseg, void *sd)
+{
+       return;
+}
+
+void * posixfd_alloc_data(struct xseg *xseg)
+{
+       struct xobject_h *sd_h = xseg_get_objh(xseg, MAGIC_POSIX_SD,
+                       sizeof(struct posixfd_signal_desc));
+       return sd_h;
+}
+
+void posixfd_free_data(struct xseg *xseg, void *data)
+{
+       if (data)
+               xseg_put_objh(xseg, (struct xobject_h *)data);
+}
+
+void *posixfd_alloc_signal_desc(struct xseg *xseg, void *data)
+{
+       struct xobject_h *sd_h = (struct xobject_h *) data;
+       if (!sd_h)
+               return NULL;
+       struct posixfd_signal_desc *psd = xobj_get_obj(sd_h, X_ALLOC);
+       if (!psd)
+               return NULL;
+       return psd;
+
+}
+
+void posixfd_free_signal_desc(struct xseg *xseg, void *data, void *sd)
+{
+       struct xobject_h *sd_h = (struct xobject_h *) data;
+       if (!sd_h)
+               return;
+       if (sd)
+               xobj_put_obj(sd_h, sd);
+       return;
+}
+
+static struct xseg_type xseg_posixfd = {
+       /* xseg_operations */
+       {
+               .mfree          = posixfd_mfree,
+               .allocate       = posixfd_allocate,
+               .deallocate     = posixfd_deallocate,
+               .map            = posixfd_map,
+               .unmap          = posixfd_unmap,
+       },
+       /* name */
+       "posixfd"
+};
+
+static struct xseg_peer xseg_peer_posixfd = {
+       /* xseg_peer_operations */
+       {
+               .init_signal_desc   = posixfd_init_signal_desc,
+               .quit_signal_desc   = posixfd_quit_signal_desc,
+               .alloc_data         = posixfd_alloc_data,
+               .free_data          = posixfd_free_data,
+               .alloc_signal_desc  = posixfd_alloc_signal_desc,
+               .free_signal_desc   = posixfd_free_signal_desc,
+               .local_signal_init  = posixfd_local_signal_init,
+               .local_signal_quit  = posixfd_local_signal_quit,
+               .remote_signal_init = posixfd_remote_signal_init,
+               .remote_signal_quit = posixfd_remote_signal_quit,
+               .prepare_wait       = posixfd_prepare_wait,
+               .cancel_wait        = posixfd_cancel_wait,
+               .wait_signal        = posixfd_wait_signal,
+               .signal             = posixfd_signal,
+               .malloc             = posixfd_malloc,
+               .realloc            = posixfd_realloc,
+               .mfree              = posixfd_mfree,
+       },
+       /* name */
+       "posixfd"
+};
+
+void xseg_posixfd_init(void)
+{
+       xseg_register_type(&xseg_posixfd);
+       xseg_register_peer(&xseg_peer_posixfd);
+}
+
index 8c6ddd9..1ef1bf6 100644 (file)
@@ -333,7 +333,7 @@ static int pthread_cancel_wait(struct xseg *xseg, uint32_t portno)
        return 0;
 }
 
-static int pthread_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
+static int pthread_wait_signal(struct xseg *xseg, void *sd, uint32_t usec_timeout)
 {
        int r;
        siginfo_t siginfo;
index 7f1673c..9ed407a 100644 (file)
@@ -210,7 +210,7 @@ static int segdev_cancel_wait(struct xseg *xseg, uint32_t portno)
        return -1;
 }
 
-static int segdev_wait_signal(struct xseg *xseg, uint32_t timeout)
+static int segdev_wait_signal(struct xseg *xseg, void *sd, uint32_t timeout)
 {
        return -1;
 }
diff --git a/xseg/drivers/xseg_posixfd.h b/xseg/drivers/xseg_posixfd.h
new file mode 100644 (file)
index 0000000..d6f4af3
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+#ifdef __KERNEL__
+#include <linux/types.h>
+#else
+#include <stdint.h>
+#endif
+
+#define MAGIC_POSIX_SD 9
+
+/* Must always end with a "/" */
+#define POSIXFD_DIR "/dev/shm/posixfd/"
+/* Must be the len of POSIXFD_DIR without the \0 */
+#define POSIXFD_DIR_LEN 17
+
+struct posixfd_signal_desc {
+       /* hexlified value of xport */
+       /* FIXME include xseg_types or sth and use sizeof(xport) */
+       char signal_file[sizeof(void *)];
+       int fd;
+       /* whether or not, the port should be signaled */
+       int flag;
+};
+
+#define POSIXFD_FILENAME_LEN \
+               (sizeof(((struct posixfd_signal_desc *)0)->signal_file))
index 01fe5dd..df1efb3 100644 (file)
@@ -36,7 +36,8 @@
 
 include $(XSEG_HOME)/base.mk
 #PEERS := xseg archip-sosd archip-mapperd archip-vlmcd archip-filed archip-bench
-PEERS := xseg archip-sosd archip-mapperd archip-vlmcd archip-filed archip-bench archip-dummy
+#PEERS := xseg archip-sosd archip-mapperd archip-vlmcd archip-filed archip-bench archip-dummy
+PEERS := archip-sosd archip-mapperd archip-vlmcd archip-filed archip-bench archip-dummy archip-benchfd
 
 FILES="Makefile"
 FILES+=$(shell ls *.h)
@@ -89,6 +90,9 @@ BENCH_PREQ=bench-xseg.c peer.c bench-lfsr.c bench-timer.c bench-utils.c \
 archip-bench: $(BENCH_PREQ)
        $(CC) $(CFLAGS) -o $@ $(CPREQS) $(INC) -L$(LIB) -lxseg -lpthread -lm
 
+archip-benchfd: $(BENCH_PREQ)
+       $(CC) $(CFLAGS) -o $@ $(CPREQS) $(INC) -L$(LIB) -lxseg -lpthread -lm -DFD
+
 archip-filed: filed.c peer.c peer.h hash.c hash.h $(BASE)/xseg/protocol.h
        $(CC) $(CFLAGS) -o $@ $(CPREQS) $(INC) -L$(LIB) -lxseg -lpthread -DMT \
                                                        -lcrypto
index a55add0..0e83ff5 100644 (file)
@@ -907,7 +907,7 @@ send_request:
                        }
                }
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
-               xseg_wait_signal(xseg, 10000000UL);
+               xseg_wait_signal(xseg, peer->sd, 10000000UL);
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "%s woke up\n", id);
        }
index 6e6b882..a092446 100644 (file)
@@ -1586,7 +1586,7 @@ int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
                                }
                        }
                }
-               xseg_wait_signal(xseg, 1000000UL);
+               xseg_wait_signal(xseg, peer->sd, 1000000UL);
        }
 }
 
index 058955a..7da6b5c 100644 (file)
@@ -60,6 +60,8 @@
 
 #ifdef MT
 #define PEER_TYPE "pthread"
+#elif defined(FD)
+#define PEER_TYPE "posixfd"
 #else
 #define PEER_TYPE "posix"
 #endif
@@ -506,7 +508,7 @@ static void* thread_loop(void *arg)
                                loops = threshold;
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
-               xseg_wait_signal(xseg, 10000000UL);
+               xseg_wait_signal(xseg, peer->sd, 10000000UL);
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
        }
@@ -654,7 +656,7 @@ static int generic_peerd_loop(void *arg)
                }
 #endif
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
-               xseg_wait_signal(xseg, 10000000UL);
+               xseg_wait_signal(xseg, peer->sd, 10000000UL);
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "%s woke up\n", id);
        }
@@ -773,14 +775,15 @@ malloc_fail:
         * The first port we bind will have its signal_desc initialized by xseg
         * and the same signal_desc will be used for all the other ports.
         */
+       peer->sd = NULL;
        for (p = peer->portno_start; p <= peer->portno_end; p++) {
-               port = xseg_bind_port(peer->xseg, p, sd);
+               port = xseg_bind_port(peer->xseg, p, peer->sd);
                if (!port){
                        printf("cannot bind to port %u\n", (unsigned int) p);
                        return NULL;
                }
                if (p == peer->portno_start)
-                       sd = xseg_get_signal_desc(peer->xseg, port);
+                       peer->sd = xseg_get_signal_desc(peer->xseg, port);
        }
 
        printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
index cae484b..4651c7d 100644 (file)
@@ -111,6 +111,7 @@ struct peerd {
        struct peer_req *peer_reqs;
        struct xq free_reqs;
        int (*peerd_loop)(void *arg);
+       void *sd;
        void *priv;
 #ifdef MT
        uint32_t nr_threads;
index c4081e9..186d6c6 100644 (file)
@@ -48,7 +48,7 @@ MAJOR=0
 MINOR=2
 AR=ar
 
-DRIVERS=xseg_posix xseg_segdev xseg_pthread
+DRIVERS=xseg_posix xseg_segdev xseg_pthread xseg_posixfd
 DRVDIR=$(BASE)/drivers/user
 DRVOBJS=$(addsuffix .o, $(addprefix $(DRVDIR)/, $(DRIVERS)))
 SHELL=/bin/bash
index 6f4697f..47a6286 100644 (file)
@@ -69,6 +69,7 @@ xseg/xseg_api.py: xseg.xml
        $(XML2PY) -d -k defst -l $(BASE)/lib/user/libxseg.so.$(MAJOR) -c $< -o $@
        sed -i -e 's/xseg_get_data_nonstatic.restype = STRING/xseg_get_data_nonstatic.restype = POINTER(c_char)/g' xseg/xseg_api.py
        sed -i -e 's/xseg_get_target_nonstatic.restype = STRING/xseg_get_target_nonstatic.restype = POINTER(c_char)/g' xseg/xseg_api.py
+       sed -i -e 's/xseg_get_signal_desc_nonstatic.restype = STRING/xseg_get_signal_desc_nonstatic.restype = POINTER(c_char)/g' xseg/xseg_api.py
        sed -i -e 's/STRING = 1//g' xseg/xseg_api.py
        sed -i -e 's,$(BASE)/lib/user/libxseg.so.$(MAJOR),libxseg.so.$(MAJOR),g' xseg/xseg_api.py
 
index 4f2497e..8b64d19 100755 (executable)
@@ -38,7 +38,7 @@
 from xseg.xseg_api import *
 from xseg.xprotocol import *
 from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
-    create_string_buffer, pointer, sizeof, POINTER, byref
+    create_string_buffer, pointer, sizeof, POINTER, byref, c_int, c_char, Structure
 import ctypes
 cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
 
@@ -52,7 +52,7 @@ from subprocess import check_call
 from collections import namedtuple
 import socket
 import random
-
+from select import select
 
 random.seed()
 hostname = socket.gethostname()
@@ -94,6 +94,29 @@ VLMC = 'archip-vlmcd'
 def is_power2(x):
     return bool(x != 0 and (x & (x-1)) == 0)
 
+#hack to test green waiting with python gevent.
+class posixfd_signal_desc(Structure):
+    pass
+posixfd_signal_desc._fields_ = [
+    ('signal_file', c_char * sizeof(c_void_p)),
+    ('fd', c_int),
+    ('flag', c_int),
+]
+
+def xseg_wait_signal_green(ctx, sd, timeout):
+    posixfd_sd = cast(sd, POINTER(posixfd_signal_desc))
+    fd = posixfd_sd.contents.fd
+    select([fd], [], [], timeout/1000000.0)
+    while True:
+        try:
+            os.read(fd, 512)
+        except OSError as (e,msg):
+            if e == 11:
+                break
+            else:
+                raise OSError(e, msg)
+
+
 class Peer(object):
     cli_opts = None
 
@@ -472,7 +495,7 @@ class Segment(object):
         xconf = xseg_config()
         spec_buf = create_string_buffer(self.spec)
         xseg_parse_spec(spec_buf, xconf)
-        ctx = xseg_join(xconf.type, xconf.name, "posix",
+        ctx = xseg_join(xconf.type, xconf.name, "posixfd",
                         cast(0, cb_null_ptrtype))
         if not ctx:
             raise Error("Cannot join segment")
@@ -724,11 +747,11 @@ def check_pidfile(name):
 
     return pid
 
-
 class Xseg_ctx(object):
     ctx = None
     port = None
     portno = None
+    signal_desc = None
 
     def __init__(self, segment, portno):
         ctx = segment.join()
@@ -737,10 +760,14 @@ class Xseg_ctx(object):
         port = xseg_bind_port(ctx, portno, c_void_p(0))
         if not port:
             raise Error("Cannot bind to port")
+        sd = xseg_get_signal_desc_nonstatic(ctx, port)
+        if not sd:
+            raise Error("Cannot get signal descriptor")
         xseg_init_local_signal(ctx, portno)
         self.ctx = ctx
         self.port = port
         self.portno = portno
+        self.signal_desc = sd
 
     def __del__(self):
         return
@@ -768,7 +795,7 @@ class Xseg_ctx(object):
                 xseg_cancel_wait(self.ctx, self.portno)
                 return received
             else:
-                xseg_wait_signal(self.ctx, 10000000)
+                xseg_wait_signal_green(self.ctx, self.signal_desc, 10000000)
 
     def wait_requests(self, requests):
         while True:
index 8d48ffc..0004ac5 100644 (file)
@@ -86,7 +86,7 @@ def merkle_hash(hashes):
 
 def init():
     rnd.seed()
-#    archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
+    archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
     archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
     archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
     if not os.path.isdir(archipelago.common.LOGS_PATH):
index 5d400ab..9df4403 100644 (file)
@@ -1002,9 +1002,9 @@ int xseg_cancel_wait(struct xseg *xseg, uint32_t portno)
        return xseg->priv->peer_type.peer_ops.cancel_wait(xseg, portno);
 }
 
-int xseg_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
+int xseg_wait_signal(struct xseg *xseg, void *sd, uint32_t usec_timeout)
 {
-       return xseg->priv->peer_type.peer_ops.wait_signal(xseg, usec_timeout);
+       return xseg->priv->peer_type.peer_ops.wait_signal(xseg, sd, usec_timeout);
 }
 
 int xseg_signal(struct xseg *xseg, xport portno)
@@ -1818,6 +1818,10 @@ char* xseg_get_target_nonstatic(struct xseg* xseg, struct xseg_request *req)
         return xseg_get_target(xseg, req);
 }
 
+void* xseg_get_signal_desc_nonstatic(struct xseg *xseg, struct xseg_port *port)
+{
+       return xseg_get_signal_desc(xseg, port);
+}
 
 #ifdef __KERNEL__
 #include <linux/module.h>
index 1e8a1ed..5829c41 100644 (file)
@@ -161,7 +161,7 @@ struct xseg_peer_operations {
        int   (*signal_leave)(struct xseg *xseg);
        int   (*prepare_wait)(struct xseg *xseg, uint32_t portno);
        int   (*cancel_wait)(struct xseg *xseg, uint32_t portno);
-       int   (*wait_signal)(struct xseg *xseg, uint32_t usec_timeout);
+       int   (*wait_signal)(struct xseg *xseg, void *sd, uint32_t usec_timeout);
        int   (*signal)(struct xseg *xseg, uint32_t portno);
        void *(*malloc)(uint64_t size);
        void *(*realloc)(void *mem, uint64_t size);
@@ -424,6 +424,7 @@ struct xseg_request *  xseg_accept          ( struct xseg         * xseg,
                                               uint32_t              portno    );
 
                 int    xseg_wait_signal     ( struct xseg         * xseg,
+                                             void                * sd,
                                               uint32_t              utimeout  );
 
                 int    xseg_signal          ( struct xseg         * xseg,
@@ -440,6 +441,7 @@ struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno);
 
 extern char* xseg_get_data_nonstatic(struct xseg* xseg, struct xseg_request *req);
 extern char* xseg_get_target_nonstatic(struct xseg* xseg, struct xseg_request *req);
+extern void* xseg_get_signal_desc_nonstatic(struct xseg *xseg, struct xseg_port *port);
 
 static inline uint32_t xseg_portno(struct xseg *xseg, struct xseg_port *port)
 {
index e5997c5..91dd777 100644 (file)
@@ -76,6 +76,7 @@ EXPORT_SYMBOL(xseg_get_allocated_requests);
 EXPORT_SYMBOL(xseg_set_freequeue_size);
 EXPORT_SYMBOL(xseg_get_data_nonstatic);
 EXPORT_SYMBOL(xseg_get_target_nonstatic);
+EXPORT_SYMBOL(xseg_get_signal_desc_nonstatic);
 
 EXPORT_SYMBOL(xseg_snprintf);
 EXPORT_SYMBOL(__xseg_errbuf);