From: Georgios D. Tsoukalas Date: Tue, 13 Dec 2011 09:47:57 +0000 (+0200) Subject: initialization from my own repository X-Git-Tag: 0.3.5~12 X-Git-Url: https://code.grnet.gr/git/archipelago/commitdiff_plain/6d486cc0e4c54f6224e7da47d84b319de3677542 initialization from my own repository --- 6d486cc0e4c54f6224e7da47d84b319de3677542 diff --git a/xseg/.gitignore b/xseg/.gitignore new file mode 100644 index 0000000..7c867c7 --- /dev/null +++ b/xseg/.gitignore @@ -0,0 +1,11 @@ +*.k.c +*.k.o +*.mod.c +*.mod.o +*.a +*.so +*.so.* +*.o +*.o.cmd +.*.ko.cmd +*.ko diff --git a/xseg/Makefile b/xseg/Makefile new file mode 100644 index 0000000..8b6b84c --- /dev/null +++ b/xseg/Makefile @@ -0,0 +1,21 @@ +.PHONY: clean + +XSEG_HOME=$(shell pwd) +export XSEG_HOME + +include config.mk +include base.mk + +default: + for f in $(MAKEDIRS); do \ + make -C $$f || break; \ + echo "-------------------------------------------------------------------"; \ + echo " "; \ + done + +clean: + for f in $(MAKEDIRS); do \ + make -C $$f clean || break; \ + echo " "; \ + done + diff --git a/xseg/base.mk b/xseg/base.mk new file mode 100644 index 0000000..0a3479d --- /dev/null +++ b/xseg/base.mk @@ -0,0 +1,15 @@ +# Default setup for subdirectory Makefiles. + +CC=gcc +ifndef MOPTS +MOPTS= +endif +ifndef COPTS +COPTS=-O2 -g -finline-functions $(MOPTS) $(DEBUG) +endif +ifndef CSTD +CSTD=-std=gnu99 -pedantic +endif +INC=-I$(BASE) +CFLAGS=-Wall $(INC) $(COPTS) $(CSTD) + diff --git a/xseg/config.mk b/xseg/config.mk new file mode 100644 index 0000000..00bb5d4 --- /dev/null +++ b/xseg/config.mk @@ -0,0 +1,8 @@ +# Host configuration + +MOPTS=-march=core2 + +# Order is significant due to dependencies. +# 'peers' should be last. +MAKEDIRS=xq drivers xseg peers sys + diff --git a/xseg/drivers/Makefile b/xseg/drivers/Makefile new file mode 100644 index 0000000..e396e0e --- /dev/null +++ b/xseg/drivers/Makefile @@ -0,0 +1,32 @@ +.PHONY: all clean xseg lib + +BASE=.. + +include $(BASE)/config.mk +include $(BASE)/base.mk + +DRIVERS=xseg_posix xseg_xsegdev +DRVOBJS=$(DRIVERS:=.o) +DRVSOS=$(DRIVERS:=.so) + +all: xseg $(DRIVERS:=.so) lib + +$(BASE)/sys/xseg_user.o: + make -C $(BASE)/sys xseg_user.o + +xseg_posix.o: xseg_posix.c $(BASE)/xseg/xseg.h + $(CC) $(CFLAGS) -fPIC -c -o $@ $< + +xseg_posix.so: xseg_posix.o $(BASE)/sys/xseg_user.o + $(CC) -shared -o $@ $< $(BASE)/sys/xseg_user.o + +xseg_xsegdev.o: xseg_xsegdev.c $(BASE)/xseg/xseg.h + $(CC) $(CFLAGS) -I$(BASE)/sys -fPIC -c -o $@ $< + +xseg_xsegdev.so: xseg_xsegdev.o $(BASE)/sys/xseg_user.o + $(CC) -shared -o $@ $< $(BASE)/sys/xseg_user.o + +lib: + cp -vaf $(DRVSOS) ../lib +clean: + rm -f $(DRVOBJS) $(DRVSOS) diff --git a/xseg/drivers/xseg_posix.c b/xseg/drivers/xseg_posix.c new file mode 100644 index 0000000..eccf26e --- /dev/null +++ b/xseg/drivers/xseg_posix.c @@ -0,0 +1,215 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ERRSIZE 512 +char errbuf[ERRSIZE]; + +static long posix_allocate(const char *name, uint64_t size) +{ + int fd, r; + fd = shm_open(name, O_RDWR | O_CREAT, 0770); + if (fd < 0) { + LOGMSG("Cannot create shared segment: %s\n", + strerror_r(errno, errbuf, ERRSIZE)); + return fd; + } + + r = lseek(fd, size -1, SEEK_SET); + if (r < 0) { + close(fd); + LOGMSG("Cannot seek into segment file: %s\n", + strerror_r(errno, errbuf, ERRSIZE)); + return r; + } + + errbuf[0] = 0; + r = write(fd, errbuf, 1); + if (r != 1) { + close(fd); + LOGMSG("Failed to set segment size: %s\n", + strerror_r(errno, errbuf, ERRSIZE)); + return r; + } + + close(fd); + return 0; +} + +static long posix_deallocate(const char *name) +{ + return shm_unlink(name); +} + +static void *posix_map(const char *name, uint64_t size) +{ + struct xseg *xseg; + int fd; + fd = shm_open(name, O_RDWR, 0000); + if (fd < 0) { + LOGMSG("Failed to open '%s' for mapping: %s\n", + name, strerror_r(errno, errbuf, ERRSIZE)); + return NULL; + } + + xseg = mmap ( XSEG_BASE_AS_PTR, + size, + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_FIXED /* | MAP_LOCKED */, + fd, 0 ); + + if (xseg == MAP_FAILED) { + LOGMSG("Could not map segment: %s\n", + strerror_r(errno, errbuf, ERRSIZE)); + return NULL; + } + + close(fd); + return xseg; +} + +static void posix_unmap(void *ptr, uint64_t size) +{ + struct xseg *xseg = ptr; + (void)munmap(xseg, xseg->segment_size); +} + + +static void handler(int signum) +{ + static unsigned long counter; + printf("%lu: signal %d: this shouldn't have happend.\n", counter, signum); + counter ++; +} + +static sigset_t savedset, set; +static pid_t pid; + +static int posix_signal_init(void) +{ + void (*h)(int); + int r; + h = signal(SIGIO, handler); + if (h == SIG_ERR) + return -1; + + sigemptyset(&set); + sigaddset(&set, SIGIO); + + r = sigprocmask(SIG_BLOCK, &set, &savedset); + if (r < 0) + return -1; + + pid = syscall(SYS_gettid); + return 0; +} + +static void posix_signal_quit(void) +{ + signal(SIGIO, SIG_DFL); + sigprocmask(SIG_SETMASK, &savedset, NULL); +} + +static int posix_prepare_wait(struct xseg_port *port) +{ + port->waitcue = pid; + return 0; +} + +static int posix_cancel_wait(struct xseg_port *port) +{ + port->waitcue = 0; + return 0; +} + +static int posix_wait_signal(struct xseg_port *port, uint32_t usec_timeout) +{ + int r; + siginfo_t siginfo; + struct timespec ts; + + ts.tv_sec = usec_timeout / 1000000; + ts.tv_nsec = 1000 * (usec_timeout - ts.tv_sec * 1000000); + + r = sigtimedwait(&set, &siginfo, &ts); + if (r < 0) + return r; + + return siginfo.si_signo; +} + +static int posix_signal(struct xseg_port *port) +{ + union sigval sigval = {0}; + pid_t cue = (pid_t)port->waitcue; + if (!cue) + return -1; + sigqueue(cue, SIGIO, sigval); + /* XXX: on error what? */ + return 0; +} + +static void *posix_malloc(uint64_t size) +{ + return malloc((size_t)size); +} + +static void *posix_realloc(void *mem, uint64_t size) +{ + return realloc(mem, (size_t)size); +} + +static void posix_mfree(void *mem) +{ + free(mem); +} + +static struct xseg_type xseg_posix = { + /* xseg_operations */ + { + .malloc = posix_malloc, + .realloc = posix_realloc, + .mfree = posix_mfree, + .allocate = posix_allocate, + .deallocate = posix_deallocate, + .map = posix_map, + .unmap = posix_unmap, + }, + /* name */ + "posix" +}; + +static struct xseg_peer xseg_peer_posix = { + /* xseg_peer_operations */ + { + .signal_init = posix_signal_init, + .signal_quit = posix_signal_quit, + .prepare_wait = posix_prepare_wait, + .cancel_wait = posix_cancel_wait, + .wait_signal = posix_wait_signal, + .signal = posix_signal, + .malloc = posix_malloc, + .realloc = posix_realloc, + .mfree = posix_mfree, + }, + /* name */ + "posix" +}; + +void xseg_posix_init(void) +{ + xseg_register_type(&xseg_posix); + xseg_register_peer(&xseg_peer_posix); +} + diff --git a/xseg/drivers/xseg_xsegdev.c b/xseg/drivers/xseg_xsegdev.c new file mode 100644 index 0000000..4872359 --- /dev/null +++ b/xseg/drivers/xseg_xsegdev.c @@ -0,0 +1,209 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ERRSIZE 512 +static char errbuf[ERRSIZE]; + +#define XSEG_DEVICE "/dev/xsegdev" +static int fdev = -1; + +static int opendev(void) +{ + if (fdev >= 0) + return fdev; + + fdev = open(XSEG_DEVICE, O_RDWR); + if (fdev < 0) { + LOGMSG("Cannot open %s: %s\n", XSEG_DEVICE, + strerror_r(errno, errbuf, ERRSIZE)); + close(fdev); + fdev = -1; + } + return fdev; +} + +static int closedev(void) +{ + int r; + if (fdev < 0) + return 0; + + r = close(fdev); + if (r < 0) { + LOGMSG("Cannot close %s: %s\n", XSEG_DEVICE, + strerror_r(errno, errbuf, ERRSIZE)); + return -1; + } else + fdev = -1; + + return 0; +} + +static long xsegdev_allocate(const char *name, uint64_t size) +{ + int fd; + long oldsize; + + fd = opendev(); + if (fd < 0) + return fd; + + oldsize = ioctl(fd, XSEGDEV_IOC_SEGSIZE, 0); + if (oldsize >= 0) { + LOGMSG("Destroying old segment\n"); + if (ioctl(fd, XSEGDEV_IOC_DESTROYSEG, 0)) { + LOGMSG("Failed to destroy old segment"); + closedev(); + return -2; + } + } + + if (ioctl(fd, XSEGDEV_IOC_CREATESEG, size)) { + LOGMSG("Failed to create segment"); + closedev(); + return -3; + } + + return 0; +} + +static long xsegdev_deallocate(const char *name) +{ + int fd; + fd = open(XSEG_DEVICE, O_RDWR); + if (fd < 0) { + LOGMSG("Cannot open %s: %s\n", XSEG_DEVICE, + strerror_r(errno, errbuf, ERRSIZE)); + return -1; + } + + if (ioctl(fd, XSEGDEV_IOC_DESTROYSEG, 0)) { + LOGMSG("Failed to destroy old segment"); + return -2; + } + + closedev(); + return 0; +} + +static void *xsegdev_map(const char *name, uint64_t size) +{ + struct xseg *xseg; + int fd; + fd = opendev(); + if (fd < 0) + return NULL; + + xseg = mmap ( XSEG_BASE_AS_PTR, + size, + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_FIXED /* | MAP_LOCKED */, + fd, 0 ); + + if (xseg == MAP_FAILED) { + LOGMSG("Could not map segment: %s\n", + strerror_r(errno, errbuf, ERRSIZE)); + closedev(); + return NULL; + } + + return xseg; +} + +static void xsegdev_unmap(void *ptr, uint64_t size) +{ + struct xseg *xseg = ptr; + (void)munmap(xseg, xseg->segment_size); +} + + +static struct xseg_type xseg_xsegdev = { + /* xseg_operations */ + { + .malloc = malloc, + .mfree = free, + .allocate = xsegdev_allocate, + .deallocate = xsegdev_deallocate, + .map = xsegdev_map, + .unmap = xsegdev_unmap + }, + /* name */ + "xsegdev" +}; + +static int xsegdev_signal_init(void) +{ + return 0; +} + +static void xsegdev_signal_quit(void) { } + +static int xsegdev_prepare_wait(struct xseg_port *port) +{ + return -1; +} + +static int xsegdev_cancel_wait(struct xseg_port *port) +{ + return -1; +} + +static int xsegdev_wait_signal(struct xseg_port *port, uint32_t timeout) +{ + return -1; +} + +static int xsegdev_signal(struct xseg_port *port) +{ + return write(opendev(), NULL, 0); +} + +static void *xsegdev_malloc(uint64_t size) +{ + return NULL; +} + +static void *xsegdev_realloc(void *mem, uint64_t size) +{ + return NULL; +} + +static void xsegdev_mfree(void *mem) { } + +static struct xseg_peer xseg_peer_xsegdev = { + /* xseg signal operations */ + { + .signal_init = xsegdev_signal_init, + .signal_quit = xsegdev_signal_quit, + .prepare_wait = xsegdev_prepare_wait, + .cancel_wait = xsegdev_cancel_wait, + .wait_signal = xsegdev_wait_signal, + .signal = xsegdev_signal, + .malloc = xsegdev_malloc, + .realloc = xsegdev_realloc, + .mfree = xsegdev_mfree + }, + /* name */ + "xsegdev" +}; + +void xseg_xsegdev_init(void) +{ + xseg_register_type(&xseg_xsegdev); + xseg_register_peer(&xseg_peer_xsegdev); +} + diff --git a/xseg/lib/.gitignore b/xseg/lib/.gitignore new file mode 100644 index 0000000..ed637b6 --- /dev/null +++ b/xseg/lib/.gitignore @@ -0,0 +1,4 @@ +*.o +*.a +*.so +lib*.so.* diff --git a/xseg/lib/README b/xseg/lib/README new file mode 100644 index 0000000..86e2ce7 --- /dev/null +++ b/xseg/lib/README @@ -0,0 +1,3 @@ +Make files in other directories will send their +libraries and plugins in this directory. +However, they will never clean it up. diff --git a/xseg/peers/.gitignore b/xseg/peers/.gitignore new file mode 100644 index 0000000..42b5036 --- /dev/null +++ b/xseg/peers/.gitignore @@ -0,0 +1,2 @@ +blockd +xseg diff --git a/xseg/peers/Makefile b/xseg/peers/Makefile new file mode 100644 index 0000000..3f33705 --- /dev/null +++ b/xseg/peers/Makefile @@ -0,0 +1,17 @@ +.PHONY: all clean + +BASE=.. + +include $(BASE)/config.mk +include $(BASE)/base.mk + +all: blockd xseg + +blockd: blockd.c $(BASE)/xseg/xseg.h + $(CC) $(CFLAGS) -o $@ $< -I$(BASE) -L$(BASE)/lib -lxseg + +xseg: xseg-tool.c $(BASE)/xseg/xseg.h + $(CC) $(CFLAGS) -o $@ $< -L$(BASE)/lib -lxseg + +clean: + rm -f blockd xseg diff --git a/xseg/peers/blockd.c b/xseg/peers/blockd.c new file mode 100644 index 0000000..0bfe05f --- /dev/null +++ b/xseg/peers/blockd.c @@ -0,0 +1,437 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +static int usage(void) +{ + printf("Usage: ./blockd [options]\n" + "Options: [-p portno]\n" + " [-s image size in bytes]\n" + " [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n" + " [-n nr_parallel_ops]\n"); + return 1; +} + +struct io { + struct aiocb cb; + struct xseg_request *req; + ssize_t retval; +}; + +struct store { + struct xseg *xseg; + struct xseg_port *xport; + uint32_t portno; + int fd; + uint64_t size; + struct io *ios; + struct xq free_ops; + char *free_bufs; + struct xq pending_ops; + char *pending_bufs; + long nr_ops; + struct sigevent sigevent; +}; + +static unsigned long sigaction_count; + +static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg) +{ + sigaction_count ++; +} + +static struct io *alloc_io(struct store *store) +{ + xqindex idx = xq_pop_head(&store->free_ops); + if (idx == None) + return NULL; + return store->ios + idx; +} + +static inline void free_io(struct store *store, struct io *io) +{ + xqindex idx = io - store->ios; + io->req = NULL; + xq_append_head(&store->free_ops, idx); +} + +static inline void pending_io(struct store *store, struct io *io) +{ + xqindex idx = io - store->ios; + xq_append_head(&store->pending_ops, idx); +} + +static inline struct io *get_pending_io(struct store *store) +{ + xqindex idx = xq_pop_head(&store->pending_ops); + if (idx == None) + return NULL; + return store->ios + idx; +} + +static void log_io(char *msg, struct io *io) +{ + char name[64], data[64]; + strncpy(name, io->req->name, 63); + name[63] = 0; + /* strncpy(data, io->req->data, 63); */ + data[63] = 0; + printf("%s: fd:%u, op:%u %llu:%lu retval: %lu, reqstate: %u\n" + "name[%u]:'%s', data[%llu]:\n%s------------------\n\n", + msg, + (unsigned int)io->cb.aio_fildes, + (unsigned int)io->req->op, + (unsigned long long)io->cb.aio_offset, + (unsigned long)io->cb.aio_nbytes, + (unsigned long)io->retval, + (unsigned int)io->req->state, + (unsigned int)io->req->namesize, name, + (unsigned long long)io->req->datasize, data); +} + +static void complete(struct store *store, struct io *io) +{ + struct xseg_request *req = io->req; + req->state |= XS_SERVED; + log_io("complete", io); + xseg_respond(store->xseg, req->portno, req); + xseg_signal(store->xseg, req->portno); + free_io(store, io); +} + +static void fail(struct store *store, struct io *io) +{ + struct xseg_request *req = io->req; + req->state |= XS_ERROR; + log_io("fail", io); + xseg_respond(store->xseg, req->portno, req); + xseg_signal(store->xseg, req->portno); + free_io(store, io); +} + +static void pending(struct store *store, struct io *io) +{ + io->req->state = XS_PENDING; + pending_io(store, io); +} + +static void handle_unknown(struct store *store, struct io *io) +{ + struct xseg_request *req = io->req; + snprintf(req->data, req->datasize, "unknown request op"); + fail(store, io); +} + +static inline void prepare_io(struct store *store, struct io *io) +{ + io->cb.aio_fildes = store->fd; + io->cb.aio_sigevent = store->sigevent; + /* cb->aio_sigevent.sigev_value.sival_int = fd; */ +} + +static void handle_read_write(struct store *store, struct io *io) +{ + int r; + struct xseg_request *req = io->req; + struct aiocb *cb = &io->cb; + + if (req->state != XS_ACCEPTED) { + if (io->retval > 0) + req->serviced += io->retval; + else + req->datasize = req->serviced; + + if (req->serviced >= req->datasize) { + complete(store, io); + return; + } + } + + if (req != io->req) + printf("0.%p vs %p!\n", (void *)req, (void *)io->req); + if (!req->size) { + if (req->flags & (XSEG_FLUSH | XSEG_FUA)) { + /* for now, no FLUSH/FUA support. + * note that with FLUSH/size == 0 + * there will probably be a (uint64_t)-1 offset */ + complete(store, io); + return; + } else { + complete(store, io); + return; + } + } + + prepare_io(store, io); + cb->aio_buf = req->data; + cb->aio_nbytes = req->datasize - req->serviced; + cb->aio_offset = req->offset + req->serviced; + + switch (req->op) { + case X_READ: + r = aio_read(cb); + break; + case X_WRITE: + r = aio_write(cb); + break; + default: + snprintf(req->data, req->datasize, + "wtf, corrupt op %u?\n", req->op); + fail(store, io); + return; + } + + if (r) { + strerror_r(errno, req->data, req->datasize); + fail(store, io); + return; + } + + pending(store, io); +} + +static void dispatch(struct store *store, struct io *io) +{ + switch (io->req->op) { + case X_READ: + case X_WRITE: + handle_read_write(store, io); break; + case X_SYNC: + default: + handle_unknown(store, io); + } +} + +static void handle_pending(struct store *store, struct io *io) +{ + int r = aio_error(&io->cb); + if (r == EINPROGRESS) { + pending(store, io); + return; + } + + io->retval = aio_return(&io->cb); + if (r) { + fail(store, io); + return; + } + + dispatch(store, io); +} + +static void handle_accepted(struct store *store, struct io *io) +{ + struct xseg_request *req = io->req; + req->serviced = 0; + req->state = XS_ACCEPTED; + io->retval = 0; + dispatch(store, io); +} + +static int blockd_loop(struct store *store) +{ + struct xseg *xseg = store->xseg; + uint32_t portno = store->portno; + struct io *io; + struct xseg_request *accepted; + + for (;;) { + accepted = NULL; + xseg_prepare_wait(xseg, portno); + io = alloc_io(store); + if (io) { + accepted = xseg_accept(xseg, portno); + if (accepted) { + xseg_cancel_wait(xseg, portno); + io->req = accepted; + handle_accepted(store, io); + } else + free_io(store, io); + } + + io = get_pending_io(store); + if (io) { + xseg_cancel_wait(xseg, portno); + handle_pending(store, io); + } + + if (!io && !accepted) + xseg_wait_signal(xseg, portno, 10000); + } + + return 0; +} + +static struct xseg *join(char *spec) +{ + struct xseg_config config; + struct xseg *xseg; + + (void)xseg_parse_spec(spec, &config); + xseg = xseg_join(config.type, config.name); + if (xseg) + return xseg; + + (void)xseg_create(&config); + return xseg_join(config.type, config.name); +} + +static int blockd(char *path, unsigned long size, uint32_t nr_ops, + char *spec, long portno) +{ + struct stat stat; + struct sigaction sa; + struct store *store; + int r; + + store = malloc(sizeof(struct store)); + if (!store) { + perror("malloc"); + return -1; + } + + store->fd = open(path, O_RDWR); + while (store->fd < 0) { + if (errno == ENOENT && size) + store->fd = open(path, O_RDWR | O_CREAT, 0600); + if (store->fd >= 0) + break; + perror(path); + return store->fd; + } + + if (size == 0) { + r = fstat(store->fd, &stat); + if (r < 0) { + perror(path); + return r; + } + size = stat.st_size; + if (size == 0) { + fprintf(stderr, "size cannot be zero\n"); + return -1; + } + } + + lseek(store->fd, size-1, SEEK_SET); + if (write(store->fd, &r, 1) != 1) { + perror("write"); + return -1; + } + + /* + r = daemon(1, 1); + if (r < 0) + return r; + */ + + store->sigevent.sigev_notify = SIGEV_SIGNAL; + store->sigevent.sigev_signo = SIGIO; + sa.sa_sigaction = sigaction_handler; + sa.sa_flags = SA_SIGINFO; + if (sigemptyset(&sa.sa_mask)) + perror("sigemptyset"); + + if (sigaction(SIGIO, &sa, NULL)) { + perror("sigaction"); + return -1; + } + + store->nr_ops = nr_ops; + store->free_bufs = calloc(nr_ops, sizeof(xqindex)); + if (!store->free_bufs) + goto malloc_fail; + + store->pending_bufs = calloc(nr_ops, sizeof(xqindex)); + if (!store->pending_bufs) + goto malloc_fail; + + store->ios = calloc(nr_ops, sizeof(struct io)); + if (!store->ios) { +malloc_fail: + perror("malloc"); + return -1; + } + + xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs); + xq_init_empty(&store->pending_ops, nr_ops, store->pending_bufs); + + if (xseg_initialize("posix")) { + printf("cannot initialize library\n"); + return -1; + } + store->xseg = join(spec); + if (!store->xseg) + return -1; + + store->xport = xseg_bind_port(store->xseg, portno); + if (!store->xport) { + printf("cannot bind to port %ld\n", portno); + return -1; + } + + store->portno = xseg_portno(store->xseg, store->xport); + printf("blockd on port %u/%u\n", + store->portno, store->xseg->config.nr_ports); + + return blockd_loop(store); +} + +int main(int argc, char **argv) +{ + char *path, *spec = ""; + unsigned long size; + int i; + long portno; + uint32_t nr_ops; + + if (argc < 2) + return usage(); + + path = argv[1]; + size = 0; + portno = -1; + nr_ops = 0; + + for (i = 2; i < argc; i++) { + if (!strcmp(argv[i], "-g") && i + 1 < argc) { + spec = argv[i+1]; + i += 1; + continue; + } + + if (!strcmp(argv[i], "-s") && i + 1 < argc) { + size = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + + if (!strcmp(argv[i], "-p") && i + 1 < argc) { + portno = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + + if (!strcmp(argv[i], "-p") && i + 1 < argc) { + nr_ops = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + } + + if (nr_ops <= 0) + nr_ops = 16; + + return blockd(path, size, nr_ops, spec, portno); +} + diff --git a/xseg/peers/xseg-tool.c b/xseg/peers/xseg-tool.c new file mode 100644 index 0000000..4ba42d1 --- /dev/null +++ b/xseg/peers/xseg-tool.c @@ -0,0 +1,1047 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +int help(void) +{ + printf("xseg [[[]:[]] [ ]* ]*\n" + "spec:\n" + " \n" + "global commands:\n" + " reportall\n" + " create\n" + " destroy\n" + " bind \n" + " signal \n" + " bridge {full|summary|stats}\n" + "port commands:\n" + " report\n" + " alloc_requests (to source) \n" + " free_requests (from source) \n" + " put_requests (all from dest)\n" + " put_replies (all from dest)\n" + " wait \n" + " complete \n" + " fail \n" + " rndwrite \n" + " rndread \n" + " info \n" + " read \n" + " write < data\n" + " truncate \n" + " delete \n" + " acquire \n" + " release \n" + " copy \n" + " clone \n" + ); + return 1; +} + +char *namebuf; +char *chunk; +struct xseg_config cfg; +struct xseg *xseg; +uint32_t srcport, dstport; + + +#define mkname mkname_heavy +/* heavy distributes duplicates much more widely than light + * ./xseg-tool random 100000 | cut -d' ' -f2- | sort | uniq -d -c |wc -l + */ + +void mkname_heavy(char *name, uint32_t namesize, uint32_t seed) +{ + int i; + char c; + for (i = 0; i < namesize; i += 1) { + c = seed + (seed >> 8) + (seed >> 16) + (seed >> 24); + c = '0' + ((c + (c >> 4)) & 0xf); + if (c > '9') + c += 'a'-'0'-10; + name[i] = c; + seed *= ((seed % 137911) | 1) * 137911; + } +} + +void mkname_light(char *name, uint32_t namesize, uint32_t seed) +{ + int i; + char c; + for (i = 0; i < namesize; i += 1) { + c = seed; + name[i] = 'A' + (c & 0xf); + seed += 1; + } +} + +uint64_t pick(uint64_t size) +{ + return (uint64_t)((double)(RAND_MAX) / random()); +} + +void mkchunk( char *chunk, uint32_t datasize, + char *name, uint32_t namesize, uint64_t offset) +{ + long i, r, bufsize = namesize + 16; + char buf[bufsize]; + r = datasize % bufsize; + snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name); + + for (i = 0; i <= (long)datasize - bufsize; i += bufsize) + memcpy(chunk + i, buf, bufsize); + + memcpy(chunk + datasize - r, buf, r); +} + +int chkchunk( char *chunk, uint32_t datasize, + char *name, uint32_t namesize, uint64_t offset) +{ + long i, r; + int bufsize = namesize + 16; + char buf[bufsize]; + r = datasize % namesize; + snprintf(buf, bufsize, "%016llx%s", (unsigned long long)offset, name); + + for (i = 0; i <= (long)datasize - bufsize; i += bufsize) + if (memcmp(chunk + i, buf, bufsize)) { + /*printf("mismatch: '%*s'* vs '%*s'\n", + bufsize, buf, datasize, chunk); + */ + return 0; + } + + if (memcmp(chunk + datasize - r, buf, r)) + return 0; + + return 1; +} + + +#define ALLOC_MIN 4096 +#define ALLOC_MAX 1048576 + +void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize) +{ + static uint64_t alloc_size; + static char *buf; + uint64_t size = 0; + char *p; + size_t r; + + if (alloc_size < ALLOC_MIN) + alloc_size = ALLOC_MIN; + + if (alloc_size > ALLOC_MAX) + alloc_size = ALLOC_MAX; + + p = realloc(buf, alloc_size); + if (!p) { + if (buf) + free(buf); + buf = NULL; + goto out; + } + + buf = p; + + while (!feof(fp)) { + r = fread(buf + size, 1, alloc_size - size, fp); + if (!r) + break; + size += r; + if (size >= alloc_size) { + p = realloc(buf, alloc_size * 2); + if (!p) { + if (buf) + free(buf); + buf = NULL; + size = 0; + goto out; + } + buf = p; + alloc_size *= 2; + } + } + +out: + *retbuf = buf; + *retsize = size; +} + +void report_request(struct xseg_request *req) +{ + uint32_t max = req->datasize; + if (max > 128) + max = 128; + req->data[max-1] = 0; + printf("request %llu state %u\n", (unsigned long long)req->serial, req->state); + printf("data: %s\n", req->data); +} + +int cmd_info(char *name) +{ + return 0; +} + +int cmd_read(char *name, uint64_t offset, uint64_t size) +{ + uint32_t namesize = strlen(name); + int r; + xserial srl; + struct xseg_request *req = xseg_get_request(xseg, srcport); + if (!req) { + printf("No request\n"); + return -1; + } + + r = xseg_prep_request(req, namesize, size); + if (r < 0) { + printf("Cannot prepare request! (%lu, %llu)\n", + (unsigned long)namesize, (unsigned long long)size); + xseg_put_request(xseg, srcport, req); + return -1; + } + + strncpy(req->name, name, namesize); + req->offset = offset; + req->size = size; + req->op = X_READ; + + srl = xseg_submit(xseg, dstport, req); + if (srl == None) + return -1; + + xseg_signal(xseg, dstport); + return 0; +} + +int cmd_write(char *name, uint64_t offset) +{ + char *buf = NULL; + int r; + xserial srl; + uint64_t size = 0; + uint32_t namesize = strlen(name); + struct xseg_request *req; + + inputbuf(stdin, &buf, &size); + if (!size) { + printf("No input\n"); + return -1; + } + + req = xseg_get_request(xseg, srcport); + if (!req) { + printf("No request\n"); + return -1; + } + + r = xseg_prep_request(req, namesize, size); + if (r < 0) { + printf("Cannot prepare request! (%lu, %llu)\n", + (unsigned long)namesize, (unsigned long long)size); + xseg_put_request(xseg, srcport, req); + return -1; + } + + strncpy(req->name, name, namesize); + memcpy(req->buffer, buf, size); + req->offset = offset; + req->size = size; + req->op = X_WRITE; + + srl = xseg_submit(xseg, dstport, req); + if (srl == None) { + printf("Cannot submit\n"); + return -1; + } + + return 0; +} + +int cmd_truncate(char *name, uint64_t offset) +{ + return 0; +} + +int cmd_delete(char *name) +{ + return 0; +} + +int cmd_acquire(char *name) +{ + return 0; +} + +int cmd_release(char *name) +{ + return 0; +} + +int cmd_copy(char *src, char *dst) +{ + return 0; +} + +int cmd_clone(char *src, char *dst) +{ + return 0; +} + +void log_req( uint32_t portno2, uint32_t portno1, int op, int method, + struct xseg_request *req) +{ + return; +} + +#define LOG_ACCEPT 0 +#define LOG_RECEIVE 1 + +int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how) +{ + struct xseg_request *req; + int logfd, method; + if (!strcmp(logfile, "-")) + logfd = 1; + else { + logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, 0600); + if (logfd < 0) { + perror(logfile); + return -1; + } + } + + if (!strcmp(how, "full")) + method = 0; + else if (!strcmp(how, "full")) + method = 1; + else + method = 2; + + for (;;) { + int reloop = 0, active; + xseg_prepare_wait(xseg, portno1); + xseg_prepare_wait(xseg, portno2); + req = NULL; + + for (;;) { + active = 0; + + req = xseg_accept(xseg, portno1); + if (req) { + xseg_submit(xseg, portno2, req); + log_req(portno1, portno2, LOG_ACCEPT, method, req); + active += 1; + } + + req = xseg_accept(xseg, portno2); + if (req) { + xseg_submit(xseg, portno1, req); + log_req(portno2, portno1, LOG_ACCEPT, method, req); + active += 1; + } + + req = xseg_receive(xseg, portno1); + if (req) { + xseg_respond(xseg, portno2, req); + log_req(portno1, portno2, LOG_RECEIVE, method, req); + active += 1; + } + + req = xseg_receive(xseg, portno2); + if (req) { + xseg_respond(xseg, portno1, req); + log_req(portno2, portno1, LOG_RECEIVE, method, req); + active += 1; + } + + if (active == 0) { + if (reloop) + break; + /* wait on multiple queues? */ + xseg_wait_signal(xseg, portno1, 100000); + break; + } else { + xseg_cancel_wait(xseg, portno1); + xseg_cancel_wait(xseg, portno2); + reloop = 1; + } + } + } + + return 0; +} + +int cmd_rndwrite(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size) +{ + if (loops < 0) + return help(); + + if (namesize >= chunksize) { + printf("namesize >= chunksize\n"); + return -1; + } + + char *p = realloc(namebuf, namesize+1); + if (!p) { + printf("Cannot allocate memory\n"); + return -1; + } + namebuf = p; + + p = realloc(chunk, chunksize); + if (!p) { + printf("Cannot allocate memory\n"); + return -1; + } + chunk = p; + memset(chunk, 0, chunksize); + + srandom(seed); + + struct xseg_request *submitted = NULL, *received; + long nr_submitted = 0, nr_received = 0, nr_failed = 0; + int reported = 0, r; + uint64_t offset; + xserial srl; + + for (;;) { + xseg_prepare_wait(xseg, srcport); + if (nr_submitted < loops && + (submitted = xseg_get_request(xseg, srcport))) { + xseg_cancel_wait(xseg, srcport); + r = xseg_prep_request(submitted, namesize, chunksize); + if (r < 0) { + printf("Cannot prepare request! (%u, %u)\n", + namesize, chunksize); + xseg_put_request(xseg, submitted->portno, submitted); + return -1; + } + + nr_submitted += 1; + reported = 0; + seed = random(); + mkname(namebuf, namesize, seed); + namebuf[namesize] = 0; + //printf("%ld: %s\n", nr_submitted, namebuf); + strncpy(submitted->name, namebuf, namesize); + offset = 0;// pick(size); + mkchunk(submitted->buffer, chunksize, namebuf, namesize, offset); + + submitted->offset = offset; + submitted->size = chunksize; + submitted->op = X_WRITE; + submitted->flags |= XF_NOSYNC; + + srl = xseg_submit(xseg, dstport, submitted); + (void)srl; + xseg_signal(xseg, dstport); + } + + received = xseg_receive(xseg, srcport); + if (received) { + xseg_cancel_wait(xseg, srcport); + nr_received += 1; + if (!(received->state & XS_SERVED)) { + nr_failed += 1; + report_request(received); + } + if (xseg_put_request(xseg, received->portno, received)) + printf("Cannot put request at port %u\n", received->portno); + } + + if (!submitted && !received) + xseg_wait_signal(xseg, srcport, 1000000); + + if (nr_submitted % 1000 == 0 && !reported) { + reported = 1; + printf("submitted %ld, received %ld, failed %ld\n", + nr_submitted, nr_received, nr_failed); + } + + if (nr_received >= loops) + break; + } + + printf("submitted %ld, received %ld, failed %ld\n", + nr_submitted, nr_received, nr_failed); + return 0; +} + +/* note: + * prepare/wait rhythm, + * files are converted to independent chunk access patterns, +*/ + +int cmd_rndread(long loops, int32_t seed, uint32_t namesize, uint32_t chunksize, uint64_t size) +{ + if (loops < 0) + return help(); + + if (namesize >= chunksize) { + printf("namesize >= chunksize\n"); + return -1; + } + + char *p = realloc(namebuf, namesize+1); + if (!p) { + printf("Cannot allocate memory\n"); + return -1; + } + namebuf = p; + + p = realloc(chunk, chunksize); + if (!p) { + printf("Cannot allocate memory\n"); + return -1; + } + chunk = p; + memset(chunk, 0, chunksize); + + srandom(seed); + + struct xseg_request *submitted = NULL, *received; + long nr_submitted = 0, nr_received = 0, nr_failed = 0, nr_mismatch = 0; + int reported = 0, r; + uint64_t offset; + xserial srl; + + for (;;) { + submitted = NULL; + xseg_prepare_wait(xseg, srcport); + if (nr_submitted < loops && + (submitted = xseg_get_request(xseg, srcport))) { + xseg_cancel_wait(xseg, srcport); + r = xseg_prep_request(submitted, namesize, chunksize); + if (r < 0) { + printf("Cannot prepare request! (%u, %u)\n", + namesize, chunksize); + xseg_put_request(xseg, submitted->portno, submitted); + return -1; + } + + nr_submitted += 1; + reported = 0; + seed = random(); + mkname(namebuf, namesize, seed); + namebuf[namesize] = 0; + //printf("%ld: %s\n", nr_submitted, namebuf); + offset = 0;//pick(size); + + strncpy(submitted->name, namebuf, namesize); + submitted->offset = offset; + submitted->size = chunksize; + submitted->op = X_READ; + + srl = xseg_submit(xseg, dstport, submitted); + (void)srl; + xseg_signal(xseg, dstport); + } + + received = xseg_receive(xseg, srcport); + if (received) { + xseg_cancel_wait(xseg, srcport); + nr_received += 1; + if (!(received->state & XS_SERVED)) { + nr_failed += 1; + report_request(received); + } else if (!chkchunk(received->data, received->datasize, + received->name, received->namesize, received->offset)) { + nr_mismatch += 1; + } + + if (xseg_put_request(xseg, received->portno, received)) + printf("Cannot put request at port %u\n", received->portno); + } + + if (!submitted && !received) + xseg_wait_signal(xseg, srcport, 1000000); + + if (nr_submitted % 1000 == 0 && !reported) { + reported = 1; + printf("submitted %ld, received %ld, failed %ld, mismatched %ld\n", + nr_submitted, nr_received, nr_failed, nr_mismatch); + } + + if (nr_received >= loops) + break; + } + + printf("submitted %ld, received %ld, failed %ld, mismatched %ld\n", + nr_submitted, nr_received, nr_failed, nr_mismatch); + return 0; +} + +int cmd_report(uint32_t port) +{ + struct xq *fq, *rq, *pq; + fq = &xseg->ports[port].free_queue; + rq = &xseg->ports[port].request_queue; + pq = &xseg->ports[port].reply_queue; + printf("port %u:\n" + " free_queue [%p] count : %u\n" + " request_queue [%p] count : %u\n" + " reply_queue [%p] count : %u\n", + port, + (void *)fq, xq_count(fq), + (void *)rq, xq_count(rq), + (void *)pq, xq_count(pq)); + return 0; +} + +int cmd_join(void) +{ + if (xseg) + return 0; + + xseg = xseg_join(cfg.type, cfg.name); + if (!xseg) { + printf("cannot join segment!\n"); + return -1; + } + return 0; +} + +int cmd_reportall(void) +{ + uint32_t t; + + if (cmd_join()) + return -1; + + printf("global free requests: %u\n", xq_count(xseg->free_requests)); + for (t = 0; t < xseg->config.nr_ports; t++) + cmd_report(t); + + return 0; +} + +int cmd_create(void) +{ + int r = xseg_create(&cfg); + if (r) { + printf("cannot create segment!\n"); + return -1; + } + + printf("Segment initialized.\n"); + return 0; +} + +int cmd_destroy(void) +{ + if (!xseg && cmd_join()) + return -1; + xseg_destroy(xseg); + xseg = NULL; + printf("Segment destroyed.\n"); + return 0; +} + +int cmd_alloc_requests(unsigned long nr) +{ + return xseg_alloc_requests(xseg, srcport, nr); +} + +int cmd_free_requests(unsigned long nr) +{ + return xseg_free_requests(xseg, srcport, nr); +} + +int cmd_put_requests(void) +{ + struct xseg_request *req; + + for (;;) { + req = xseg_accept(xseg, dstport); + if (!req) + break; + if (xseg_put_request(xseg, req->portno, req)) + printf("Cannot put request at port %u\n", req->portno); + } + + return 0; +} + +int cmd_finish(unsigned long nr, int fail) +{ + struct xseg_request *req; + + for (; nr--;) { + req = xseg_accept(xseg, srcport); + if (!req) + break; + if (fail) + req->state &= ~XS_SERVED; + else + req->state |= XS_SERVED; + xseg_respond(xseg, dstport, req); + xseg_signal(xseg, dstport); + } + + return 0; +} + +void handle_reply(struct xseg_request *req) +{ + if (!(req->state & XS_SERVED)) { + report_request(req); + goto put; + } + + switch (req->op) { + case X_READ: + fwrite(req->data, 1, req->datasize, stdout); + break; + + case X_WRITE: + case X_SYNC: + case X_DELETE: + case X_TRUNCATE: + case X_COMMIT: + case X_CLONE: + default: + break; + } + +put: + if (xseg_put_request(xseg, req->portno, req)) + fprintf(stderr, "Cannot put reply at port %u\n", req->portno); +} + +int cmd_wait(uint32_t nr) +{ + struct xseg_request *req; + long ret; + + for (;;) { + req = xseg_receive(xseg, srcport); + if (req) { + handle_reply(req); + nr--; + if (nr == 0) + break; + continue; + } + + ret = xseg_prepare_wait(xseg, srcport); + if (ret) + return -1; + + ret = xseg_wait_signal(xseg, srcport, 1000000); + ret = xseg_cancel_wait(xseg, srcport); + if (ret) + return -1; + } + + return 0; +} + +int cmd_put_replies(void) +{ + struct xseg_request *req; + + for (;;) { + req = xseg_receive(xseg, dstport); + if (!req) + break; + printf("request: %08llx%08llx\n" + " op: %u\n" + " state: %u\n", + 0LL, (unsigned long long)req->serial, + req->op, + req->state); + report_request(req); + + //fwrite(req->buffer, 1, req->buffersize, stdout); + + if (xseg_put_request(xseg, req->portno, req)) + fprintf(stderr, "Cannot put reply\n"); + } + + return 0; +} + +int cmd_bind(long portno) +{ + struct xseg_port *port = xseg_bind_port(xseg, portno); + if (!port) { + printf("failed to bind port %ld\n", portno); + return 1; + } + + printf("bound port %u\n", xseg_portno(xseg, port)); + return 0; +} + +int cmd_signal(uint32_t portno) +{ + return xseg_signal(xseg, portno); +} + +int parse_ports(char *str) +{ + int ret = 0; + char *s = str; + + for (;;) { + if (*s == 0) + return 0; + + if (*s == ':') { + *s = 0; + if ((s > str) && isdigit(str[0])) { + srcport = atol(str); + ret ++; + } + break; + } + s ++; + } + + s += 1; + str = s; + + for (;;) { + if (*s == 0) { + if ((s > str) && isdigit(str[0])) { + dstport = atol(str); + ret ++; + } + break; + } + s ++; + } + + return ret; +} + +int main(int argc, char **argv) { + + int i, ret = 0; + char *spec; + + if (argc < 3) + return help(); + + srcport = -1; + dstport = -1; + spec = argv[1]; + + if (xseg_parse_spec(spec, &cfg)) { + printf("Cannot parse spec\n"); + return -1; + } + + if (xseg_initialize("posix")) { + printf("cannot initialize!\n"); + return -1; + } + + for (i = 2; i < argc; i++) { + + if (!strcmp(argv[i], "create")) { + ret = cmd_create(); + continue; + } + + if (!strcmp(argv[i], "join")) { + ret = cmd_join(); + if (!ret) + printf("Segment joined.\n"); + continue; + } + + if (!strcmp(argv[i], "destroy")) { + ret = cmd_destroy(); + continue; + } + + if (cmd_join()) + return -1; + + if (!strcmp(argv[i], "reportall")) { + ret = cmd_reportall(); + continue; + } + + if (!strcmp(argv[i], "bind") && (i + 1 < argc)) { + ret = cmd_bind(atol(argv[i+1])); + i += 1; + continue; + } + + if (!strcmp(argv[i], "signal") && (i + 1 < argc)) { + ret = cmd_signal(atol(argv[i+1])); + i += 1; + continue; + } + + if (!strcmp(argv[i], "bridge") && (i + 4 < argc)) { + ret = cmd_bridge(atol(argv[i+1]), + atol(argv[i+2]), + argv[i+3], + argv[i+4]); + i += 4; + continue; + } + + if (srcport == -1) { + if (!parse_ports(argv[i])) + printf("source port undefined: %s\n", argv[i]); + continue; + } + + if (dstport == -1) { + if (!parse_ports(argv[i])) + printf("destination port undefined: %s\n", argv[i]); + continue; + } + + if (!strcmp(argv[i], "report")) { + ret = cmd_report(dstport); + continue; + } + + if (!strcmp(argv[i], "alloc_requests") && (i + 1 < argc)) { + ret = cmd_alloc_requests(atol(argv[i+1])); + i += 1; + continue; + } + + if (!strcmp(argv[i], "free_requests") && (i + 1 < argc)) { + ret = cmd_free_requests(atol(argv[i+1])); + i += 1; + continue; + } + + if (!strcmp(argv[i], "put_requests")) { + ret = cmd_put_requests(); + continue; + } + + if (!strcmp(argv[i], "put_replies")) { + ret = cmd_put_replies(); + continue; + } + + if (!strcmp(argv[i], "complete") && (i + 1 < argc)) { + ret = cmd_finish(atol(argv[i+1]), 0); + i += 1; + continue; + } + + if (!strcmp(argv[i], "fail") && (i + 1 < argc)) { + ret = cmd_finish(atol(argv[i+1]), 1); + i += 1; + continue; + } + + if (!strcmp(argv[i], "wait") && (i + 1 < argc)) { + ret = cmd_wait(atol(argv[i+1])); + i += 1; + continue; + } + + if (!strcmp(argv[i], "rndwrite") && (i + 5 < argc)) { + long nr_loops = atol(argv[i+1]); + unsigned int seed = atoi(argv[i+2]); + unsigned int namesize = atoi(argv[i+3]); + unsigned int chunksize = atoi(argv[i+4]); + unsigned long objectsize = atol(argv[i+5]); + ret = cmd_rndwrite(nr_loops, seed, namesize, chunksize, objectsize); + i += 5; + continue; + } + + if (!strcmp(argv[i], "rndread") && (i + 5 < argc)) { + long nr_loops = atol(argv[i+1]); + unsigned int seed = atoi(argv[i+2]); + unsigned int namesize = atoi(argv[i+3]); + unsigned int chunksize = atoi(argv[i+4]); + unsigned long objectsize = atol(argv[i+5]); + ret = cmd_rndread(nr_loops, seed, namesize, chunksize, objectsize); + i += 5; + continue; + } + + if (!strcmp(argv[i], "read") && (i + 3 < argc)) { + char *name = argv[i+1]; + uint64_t offset = atol(argv[i+2]); + uint64_t size = atol(argv[i+3]); + ret = cmd_read(name, offset, size); + i += 3; + continue; + } + + if (!strcmp(argv[i], "write") && (i + 2 < argc)) { + char *name = argv[i+1]; + uint64_t offset = atol(argv[i+2]); + ret = cmd_write(name, offset); + i += 2; + continue; + } + + if (!strcmp(argv[i], "truncate") && (i + 2 < argc)) { + char *name = argv[i+1]; + uint64_t offset = atol(argv[i+2]); + ret = cmd_truncate(name, offset); + i += 2; + continue; + } + + if (!strcmp(argv[i], "delete") && (i + 1 < argc)) { + char *name = argv[i+1]; + ret = cmd_delete(name); + i += 1; + continue; + } + + if (!strcmp(argv[i], "acquire") && (i + 1 < argc)) { + char *name = argv[i+1]; + ret = cmd_acquire(name); + i += 1; + continue; + } + + if (!strcmp(argv[i], "release") && (i + 1 < argc)) { + char *name = argv[i+1]; + ret = cmd_release(name); + i += 1; + continue; + } + + if (!strcmp(argv[i], "copy") && (i + 2) < argc) { + char *src = argv[i+1]; + char *dst = argv[i+2]; + ret = cmd_copy(src, dst); + i += 2; + continue; + } + + if (!strcmp(argv[i], "clone") && (i + 2 < argc)) { + char *src = argv[i+1]; + char *dst = argv[i+2]; + ret = cmd_clone(src, dst); + i += 2; + continue; + } + + if (!parse_ports(argv[i])) + printf("invalid argument: %s\n", argv[i]); + } + + /* xseg_leave(); */ + return ret; +} diff --git a/xseg/sys/.gitignore b/xseg/sys/.gitignore new file mode 100644 index 0000000..5377c9b --- /dev/null +++ b/xseg/sys/.gitignore @@ -0,0 +1,4 @@ +Module.symvers +libxseg.map +modules.order +.tmp_versions diff --git a/xseg/sys/Makefile b/xseg/sys/Makefile new file mode 100644 index 0000000..16000b2 --- /dev/null +++ b/xseg/sys/Makefile @@ -0,0 +1,34 @@ +.PHONY: clean + +ifndef XSEG_HOME +$(error The XSEG_HOME variable must be set) +else +BASE=$(XSEG_HOME) +endif + +KDIR := /lib/modules/$(shell uname -r)/build +PWD := $(shell pwd) +EXTRA_CFLAGS += -I$(BASE) -DRELATIVE_POINTERS + +xseg-objs := xsegmod.o xq.k.o xseg.k.o +obj-m += xsegbd.o xsegdev.o xseg.o + +default: xq.k.c xseg.k.c + $(MAKE) -C $(KDIR) SUBDIRS=$(PWD) modules + +xq.k.c: $(BASE)/xq/xq.c $(BASE)/xq/xq.h + ln -sf $< $@ + +xseg.k.c: $(BASE)/xseg/xseg.c $(BASE)/xseg/xseg.h + ln -sf $< $@ + +xseg_user.o: xseg_user.c + $(CC) -I$(BASE) -Wall -O2 -finline-functions -fPIC -c -o $@ $< + +libxseg.map: $(BASE)/xq/xq_exports.h $(BASE)/xseg/xseg_exports.h + cat $(BASE)/xq/xq_exports.h $(BASE)/xseg/xseg_exports.h | ./make_symbol_map.sh > $@ + +clean: + rm -f xseg_user.o libxseg.map xq.k.c xseg.k.c + make -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean + diff --git a/xseg/sys/make_symbol_map.sh b/xseg/sys/make_symbol_map.sh new file mode 100755 index 0000000..fcd21e8 --- /dev/null +++ b/xseg/sys/make_symbol_map.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +echo '{' +echo 'global:' +sed -e 's/EXPORT_SYMBOL(\([^)]*\));/ \1;/' +echo 'local: *;' +echo '};' + diff --git a/xseg/sys/segtest.c b/xseg/sys/segtest.c new file mode 100644 index 0000000..3ce4ea3 --- /dev/null +++ b/xseg/sys/segtest.c @@ -0,0 +1,73 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "xsegdev.h" + +int fail(const char *msg) +{ + perror(msg); + return 1; +} + +int main(int argc, char **argv) +{ + int fd; + char *segment; + unsigned long i; + long segsize, oldsize; + + if (argc < 2) { + printf("Usage: ./segtest \n"); + return 1; + } + + segsize = atol(argv[1]) * 1024; + if (segsize < 0) + segsize = -segsize; + + fd = open("/dev/xsegdev", O_RDWR); + if (fd < 0) + return fail("/dev/xsegdev"); + + oldsize = ioctl(fd, XSEGDEV_IOC_SEGSIZE, 0); + if (oldsize < 0) { + + printf("No segment found. Creating...\n"); + + if (ioctl(fd, XSEGDEV_IOC_CREATESEG, segsize)) + return fail("CREATESEG"); + + } else if (segsize != oldsize) { + + printf("Destroying old segment...\n"); + + if (ioctl(fd, XSEGDEV_IOC_DESTROYSEG, 0)) + return fail("DESTROYSEG"); + + if (ioctl(fd, XSEGDEV_IOC_CREATESEG, segsize)) + return fail("CREATESEG"); + } + + segment = mmap( NULL, segsize, + PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0 ); + close(fd); + + if (segment == MAP_FAILED) + return fail("mmap"); + + for (i = 0; i < segsize; i++) + segment[i] = (char)(i & 0xff); + + for (i = 0; i < segsize; i++) + if (segment[i] != (char)(i & 0xff)) + printf("%lu: %d vs %ld\n", i, segment[i], (i & 0xff)); + return 0; +} + diff --git a/xseg/sys/segtool.c b/xseg/sys/segtool.c new file mode 100644 index 0000000..f98712d --- /dev/null +++ b/xseg/sys/segtool.c @@ -0,0 +1,415 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "xsegdev.h" + +int help(void) +{ + printf("segtool [ ]* \n" + "commands:\n" + " create \n" + " destroy\n" + " info\n" + " map \n" + " unmap \n" + " dump \n" + " load \n" + " fill \n" + " mark\n" + " checkmark\n" + " wait\n" + ); + return 1; +} + +#define ALLOC_MIN 4096 +#define ALLOC_MAX 1048576 + +void inputbuf(FILE *fp, char **retbuf, uint64_t *retsize) +{ + static uint64_t alloc_size; + static char *buf; + uint64_t size = 0; + char *p; + size_t r; + + if (alloc_size < ALLOC_MIN) + alloc_size = ALLOC_MIN; + + if (alloc_size > ALLOC_MAX) + alloc_size = ALLOC_MAX; + + p = realloc(buf, alloc_size); + if (!p) { + if (buf) + free(buf); + buf = NULL; + goto out; + } + + buf = p; + + while (!feof(fp)) { + r = fread(buf + size, 1, alloc_size - size, fp); + if (!r) + break; + size += r; + if (size >= alloc_size) { + p = realloc(buf, alloc_size * 2); + if (!p) { + if (buf) + free(buf); + buf = NULL; + size = 0; + goto out; + } + buf = p; + alloc_size *= 2; + } + } + +out: + *retbuf = buf; + *retsize = size; +} + +static int opendev(void) +{ + int fd = open("/dev/xsegdev", O_RDWR); + if (fd < 0) + perror("/dev/xsegdev"); + return fd; +} + +static char *segment; +static unsigned long mapped_size; + +int cmd_create(uint64_t size) +{ + int r, fd = opendev(); + if (fd < 0) + return fd; + + r = ioctl(fd, XSEGDEV_IOC_CREATESEG, size); + if (r < 0) + perror("CREATESEG"); + + close(fd); + return 0; +} + +int cmd_destroy(void) +{ + int r, fd = opendev(); + if (fd < 0) + return fd; + + r = ioctl(fd, XSEGDEV_IOC_DESTROYSEG, 0); + if (r < 0) + perror("DESTROYSEG"); + + close(fd); + return 0; +} + +int cmd_info(void) +{ + long r, fd = opendev(); + if (fd < 0) + return fd; + + r = ioctl(fd, XSEGDEV_IOC_SEGSIZE, 0); + if (r < 0) + perror("SEGSIZE"); + else + printf("Segment size: %lu bytes\n", r); + close(fd); + return 0; +} + +int cmd_map(uint64_t offset, uint64_t size) +{ + char *seg; + long r = -1, fd = opendev(); + if (fd < 0) + goto out; + + r = 0; + if (segment) + goto out; + + if (!size) { + r = ioctl(fd, XSEGDEV_IOC_SEGSIZE, 0); + if (r < 0) { + perror("SEGSIZE"); + goto out; + } + size = r - offset; + } + + if (offset + size > r) { + printf("segment size would be exceeded\n"); + goto out; + } + + r = -1; + //seg = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, offset); + seg = mmap( (void*) 0x37fd0000, size, + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_FIXED, + fd, offset); + if (seg == MAP_FAILED) { + perror("mmap"); + goto out; + } + + segment = seg; + mapped_size = size; +out: + close(fd); + return r; +} + +int cmd_unmap(uint64_t size) +{ + long r = -1, fd = opendev(); + if (fd < 0) + goto out; + + r = 0; + if (!segment) + goto out; + + if (!size) + size = mapped_size; + + r = munmap(segment, size); + if (r < 0) + perror("munmap"); + else { + segment = NULL; + mapped_size = 0; + } + +out: + close(fd); + return r; +} + +int cmd_wait(void) +{ + int c, fd = open("/dev/tty", O_RDONLY); + c = read(fd, &c, 1); + close(fd); + return 0; +} + +int cmd_dump(uint64_t offset, uint64_t size) +{ + long r = -1, fd = opendev(); + if (fd < 0) + goto out; + + if (!segment) { + printf("segment not mapped\n"); + goto out; + } + + if (!size) + size = mapped_size - offset; + + if (offset + size > mapped_size) { + printf("mapped segment size would be exceeded\n"); + goto out; + } + + for (r = offset; r < offset + size; r++) + if (fputc(segment[r], stdout) == EOF) + break; + + fflush(stdout); + r = 0; +out: + close(fd); + return r; +} + +int cmd_load(uint64_t offset) +{ + long r = -1, fd = opendev(); + unsigned long pos; + + if (fd < 0) + goto out; + + if (!segment) { + printf("segment not mapped\n"); + goto out; + } + + for (pos = offset; pos < mapped_size; pos++) { + int c = fgetc(stdin); + if (c == EOF) + break; + segment[pos] = c; + } +out: + close(fd); + return r; +} + +int cmd_fill(uint64_t offset, uint64_t size, int fill) +{ + long r = -1, fd = opendev(); + uint64_t misscount = 0; + + if (fd < 0) + goto out; + + if (!segment) { + printf("segment not mapped\n"); + goto out; + } + + if (size == 0) + size = mapped_size - offset; + + if (offset + size > mapped_size) { + printf("mapped segment size would be exceeded\n"); + goto out; + } + + memset(segment + offset, fill, size); + for (size += offset; offset < size; offset++) + if (segment[offset] != (char)fill) + misscount ++; + + if (misscount) + printf("fill misscount(!) %lu\n", misscount); +out: + close(fd); + return r; +} + +int cmd_mark(void) +{ + unsigned long i, count; + unsigned long *longs; + + if (!segment) { + printf("segment not mapped\n"); + return -1; + } + + longs = (void *)segment; + count = mapped_size / sizeof(long); + for (i = 0; i < count; i++) + longs[i] = i; + + return 0; +} + +int cmd_checkmark(void) +{ + unsigned long i, count; + unsigned long *longs; + + if (!segment) { + printf("segment not mapped\n"); + return -1; + } + + longs = (void *)segment; + count = mapped_size / sizeof(long); + for (i = 0; i < count; i++) + if (longs[i] != i) + printf("%lu != %lu\n", i, longs[i]); + return 0; +} + +int main(int argc, char **argv) { + + int i, ret = 0; + + if (argc < 2) + return help(); + + for (i = 1; i < argc; i++) { + + if (!strcmp(argv[i], "info")) { + ret = cmd_info(); + continue; + } + + if (!strcmp(argv[i], "create") && (i + 1 < argc)) { + ret = cmd_create(atol(argv[i+1])); + i += 1; + continue; + } + + if (!strcmp(argv[i], "destroy")) { + ret = cmd_destroy(); + continue; + } + + if (!strcmp(argv[i], "wait")) { + ret = cmd_wait(); + continue; + } + + if (!strcmp(argv[i], "mark")) { + ret = cmd_mark(); + continue; + } + + if (!strcmp(argv[i], "checkmark")) { + ret = cmd_checkmark(); + continue; + } + + if (!strcmp(argv[i], "map") && (i + 2 < argc)) { + ret = cmd_map(atol(argv[i+1]), atol(argv[i+2])); + i += 2; + continue; + } + + if (!strcmp(argv[i], "unmap") && (i + 1 < argc)) { + ret = cmd_unmap(atol(argv[i+1])); + i += 1; + continue; + } + + if (!strcmp(argv[i], "fill") && (i + 3 < argc)) { + ret = cmd_fill( atol(argv[i+1]), + atol(argv[i+2]), + strtoul(argv[i+3], NULL, 16)); + i += 3; + continue; + } + + if (!strcmp(argv[i], "dump") && (i + 2 < argc)) { + ret = cmd_dump(atol(argv[i+1]), atol(argv[i+2])); + i += 2; + continue; + } + + if (!strcmp(argv[i], "load") && (i + 1 < argc)) { + ret = cmd_load(atol(argv[i+1])); + i += 1; + continue; + } + + return help(); + } + + return ret; +} diff --git a/xseg/sys/util.h b/xseg/sys/util.h new file mode 100644 index 0000000..8b023b0 --- /dev/null +++ b/xseg/sys/util.h @@ -0,0 +1,29 @@ +#ifndef XSEG_UTIL_H +#define XSEG_UTIL_H + +#ifdef __KERNEL__ + +#include +#include +#include + +#else + +#include +#include +#include +#include + +#endif + +void __xseg_log(const char *msg); +extern char __xseg_errbuf[4096]; +extern int (*xseg_snprintf)(char *str, size_t size, const char *format, ...); +void *xq_malloc(unsigned long size); +void xq_mfree(void *ptr); + +#define FMTARG(fmt, arg, format, ...) fmt format "%s", arg, ## __VA_ARGS__ +#define LOGMSG(...) xseg_snprintf(__xseg_errbuf, 4096, FMTARG("%s: ", __func__, ## __VA_ARGS__, "")), \ + __xseg_errbuf[4095] = 0, __xseg_log(__xseg_errbuf) + +#endif diff --git a/xseg/sys/xseg_user.c b/xseg/sys/xseg_user.c new file mode 100644 index 0000000..8f07dfc --- /dev/null +++ b/xseg/sys/xseg_user.c @@ -0,0 +1,62 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +int (*xseg_snprintf)(char *str, size_t size, const char *format, ...) = snprintf; + +char __xseg_errbuf[4096]; + +void __load_plugin(const char *name) +{ + void *dl; + void (*init)(void); + char _name[128]; + unsigned int namesize = strlen(name); + + strncpy(_name, "xseg_", 5); + strncpy(_name + 5, name, 80); + strncpy(_name + 5 + namesize, ".so", 3); + _name[5 + namesize + 3 ] = 0; + dl = dlopen(_name, RTLD_NOW); + if (!dl) { + LOGMSG("Cannot load plugin '%s': %s\n", _name, dlerror()); + return; + } + + strncpy(_name + 5 + namesize, "_init", 5); + _name[127] = 0; + init = (void (*)(void))(long)dlsym(dl, _name); + if (!init) { + LOGMSG("Init function '%s' not found!\n", _name); + return; + } + + init(); + //LOGMSG("Plugin '%s' loaded.\n", name); +} + +uint32_t __get_id(void) +{ + return syscall(SYS_gettid); +} + +void __xseg_log(const char *msg) +{ + (void)puts(msg); +} + +void *xq_malloc(unsigned long size) +{ + return malloc(size); +} + +void xq_mfree(void *ptr) +{ + free(ptr); +} diff --git a/xseg/sys/xsegbd.c b/xseg/sys/xsegbd.c new file mode 100644 index 0000000..e3f593d --- /dev/null +++ b/xseg/sys/xsegbd.c @@ -0,0 +1,714 @@ +/* xsegbd.c + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "xsegdev.h" +#include "xsegbd.h" + +#define XSEGBD_MINORS 1 + +MODULE_DESCRIPTION("xsegbd"); +MODULE_AUTHOR("XSEG"); +MODULE_LICENSE("GPL"); + +static long sector_size = 100000; +static long blksize = 512; +static int major = 0; +static char name[XSEGBD_VOLUME_NAMELEN] = "xsegbd"; +static char spec[256] = "xsegdev:xsegbd:4:512:64:1024:12"; +static int src_portno = 0, dst_portno = 1, nr_requests = 128; + +module_param(sector_size, long, 0644); +module_param(blksize, long, 0644); +module_param(major, int, 0644); +module_param(src_portno, int, 0644); +module_param(dst_portno, int, 0644); +module_param(nr_requests, int, 0644); +module_param_string(name, name, sizeof(name), 0644); +module_param_string(spec, spec, sizeof(spec), 0644); + +static volatile int count; +struct semaphore xsegbd_lock; +static struct xsegbd xsegbd; + + +/* ********************* */ +/* ** XSEG Operations ** */ +/* ********************* */ + +static void *xsegdev_malloc(uint64_t size) +{ + return kmalloc((size_t)size, GFP_KERNEL); +} + +static void *xsegdev_realloc(void *mem, uint64_t size) +{ + return krealloc(mem, (size_t)size, GFP_KERNEL); +} + +static void xsegdev_mfree(void *ptr) +{ + return kfree(ptr); +} + +static long xsegdev_allocate(const char *name, uint64_t size) +{ + int r; + struct xsegdev *xsegdev = xsegdev_get(0); + + r = IS_ERR(xsegdev) ? PTR_ERR(xsegdev) : 0; + if (r) { + XSEGLOG("cannot acquire xsegdev"); + goto err; + } + + if (xsegdev->segment) { + XSEGLOG("destroying existing xsegdev segment"); + r = xsegdev_destroy_segment(xsegdev); + if (r) + goto err; + } + + XSEGLOG("creating xsegdev segment size %llu", size); + r = xsegdev_create_segment(xsegdev, size, 1); + if (r) + goto err; + + xsegdev->segsize = size; + xsegdev_put(xsegdev); + return 0; + +err: + return r; +} + +static long xsegdev_deallocate(const char *name) +{ + struct xsegdev *xsegdev = xsegdev_get(0); + int r = IS_ERR(xsegdev) ? PTR_ERR(xsegdev) : 0; + if (r) + return r; + + clear_bit(XSEGDEV_RESERVED, &xsegdev->flags); + XSEGLOG("destroying segment"); + r = xsegdev_destroy_segment(xsegdev); + if (r) + XSEGLOG(" ...failed"); + xsegdev_put(xsegdev); + return r; +} + +static long xseg_callback(void *arg); + +static void *xsegdev_map(const char *name, uint64_t size) +{ + struct xseg *xseg = NULL; + struct xsegdev *dev = xsegdev_get(0); + int r; + r = IS_ERR(dev) ? PTR_ERR(dev) : 0; + if (r) + goto out; + + if (!dev->segment) + goto put_out; + + if (size > dev->segsize) + goto put_out; + + if (dev->callback) /* in use */ + goto put_out; + + dev->callback = xseg_callback; + dev->callarg = &xsegbd; + xseg = (void *)dev->segment; + +put_out: + xsegdev_put(dev); +out: + return xseg; +} + +static void xsegdev_unmap(void *ptr, uint64_t size) +{ + struct xsegdev *xsegdev = xsegdev_get(0); + int r = IS_ERR(xsegdev) ? PTR_ERR(xsegdev) : 0; + if (r) + return; + + xsegdev->callarg = NULL; + xsegdev->callback = NULL; + xsegdev_put(xsegdev); +} + +static struct xseg_type xseg_xsegdev = { + /* xseg operations */ + { + .malloc = xsegdev_malloc, + .realloc = xsegdev_realloc, + .mfree = xsegdev_mfree, + .allocate = xsegdev_allocate, + .deallocate = xsegdev_deallocate, + .map = xsegdev_map, + .unmap = xsegdev_unmap + }, + /* name */ + "xsegdev" +}; + +static int posix_signal_init(void) +{ + return 0; +} + +static void posix_signal_quit(void) { } + +static int posix_prepare_wait(struct xseg_port *port) +{ + return 0; +} + +static int posix_cancel_wait(struct xseg_port *port) +{ + return 0; +} + +static int posix_wait_signal(struct xseg_port *port, uint32_t timeout) +{ + return 0; +} + +static int posix_signal(struct xseg_port *port) +{ + struct pid *pid; + struct task_struct *task; + int ret = -ENOENT; + + rcu_read_lock(); + pid = find_vpid((pid_t)port->waitcue); + if (!pid) + goto out; + task = pid_task(pid, PIDTYPE_PID); + if (!task) + goto out; + + ret = send_sig(SIGIO, task, 1); +out: + rcu_read_unlock(); + return ret; +} + +static void *posix_malloc(uint64_t size) +{ + return NULL; +} + +static void *posix_realloc(void *mem, uint64_t size) +{ + return NULL; +} + +static void posix_mfree(void *mem) { } + +static struct xseg_peer xseg_peer_posix = { + /* xseg signal operations */ + { + .signal_init = posix_signal_init, + .signal_quit = posix_signal_quit, + .cancel_wait = posix_cancel_wait, + .prepare_wait = posix_prepare_wait, + .wait_signal = posix_wait_signal, + .signal = posix_signal, + .malloc = posix_malloc, + .realloc = posix_realloc, + .mfree = posix_mfree + }, + /* name */ + "posix" +}; + +static int xsegdev_signal_init(void) +{ + return 0; +} + +static void xsegdev_signal_quit(void) { } + +static int xsegdev_prepare_wait(struct xseg_port *port) +{ + return -1; +} + +static int xsegdev_cancel_wait(struct xseg_port *port) +{ + return -1; +} + +static int xsegdev_wait_signal(struct xseg_port *port, uint32_t timeout) +{ + return -1; +} + +static int xsegdev_signal(struct xseg_port *port) +{ + return -1; +} + +static struct xseg_peer xseg_peer_xsegdev = { + /* xseg signal operations */ + { + .signal_init = xsegdev_signal_init, + .signal_quit = xsegdev_signal_quit, + .cancel_wait = xsegdev_cancel_wait, + .prepare_wait = xsegdev_prepare_wait, + .wait_signal = xsegdev_wait_signal, + .signal = xsegdev_signal, + .malloc = xsegdev_malloc, + .realloc = xsegdev_realloc, + .mfree = xsegdev_mfree + }, + /* name */ + "xsegdev" +}; + +/* ************************* */ +/* ** XSEG Initialization ** */ +/* ************************* */ + +int xsegbd_xseg_init(struct xsegbd *dev) +{ + struct xseg_port *xport; + int r; + + if (!dev->name[0]) + strncpy(dev->name, name, XSEGBD_VOLUME_NAMELEN); + + XSEGLOG("registering xseg types"); + dev->namesize = strlen(dev->name); + r = xseg_register_type(&xseg_xsegdev); + if (r) + goto err0; + + r = xseg_register_peer(&xseg_peer_posix); + if (r) + goto err1; + + r = xseg_register_peer(&xseg_peer_xsegdev); + if (r) + goto err2; + + r = xseg_initialize("xsegdev"); + if (r) { + XSEGLOG("cannot initialize 'xsegdev' peer"); + goto err3; + } + + r = xseg_parse_spec(spec, &dev->config); + if (r) + goto err3; + + if (strncmp(dev->config.type, "xsegdev", 16)) + XSEGLOG("WARNING: unexpected segment type '%s' vs 'xsegdev'", + dev->config.type); + + XSEGLOG("creating segment"); + r = xseg_create(&dev->config); + if (r) { + XSEGLOG("cannot create segment"); + goto err3; + } + + XSEGLOG("joining segment"); + dev->xseg = xseg_join("xsegdev", "xsegbd"); + if (!dev->xseg) { + XSEGLOG("cannot join segment"); + r = -EFAULT; + goto err3; + } + + XSEGLOG("binding to source port %u (destination %u)", + src_portno, dst_portno); + xport = xseg_bind_port(dev->xseg, src_portno); + if (!xport) { + XSEGLOG("cannot bind to port"); + dev->xseg = NULL; + r = -EFAULT; + goto err3; + } + dev->src_portno = xseg_portno(dev->xseg, xport); + dev->dst_portno = dst_portno; + + if (nr_requests > dev->xseg->config.nr_requests) + nr_requests = dev->xseg->config.nr_requests; + + if (xseg_alloc_requests(dev->xseg, src_portno, nr_requests)) { + XSEGLOG("cannot allocate requests"); + dev->xseg = NULL; + r = -EFAULT; + goto err3; + } + + return 0; +err3: + xseg_unregister_peer(xseg_peer_xsegdev.name); +err2: + xseg_unregister_peer(xseg_peer_posix.name); +err1: + xseg_unregister_type(xseg_xsegdev.name); +err0: + return r; +} + +int xsegbd_xseg_quit(struct xsegbd *dev) +{ + xseg_destroy(dev->xseg); + dev->xseg = NULL; + return 0; +} + + +/* ***************************** */ +/* ** Block Device Operations ** */ +/* ***************************** */ + +static int xsegbd_open(struct block_device *bdev, fmode_t mode) +{ + int ret = down_interruptible(&xsegbd_lock); + if (ret == 0) { + count ++; + up(&xsegbd_lock); + } + return ret; +} + +static int xsegbd_release(struct gendisk *gd, fmode_t mode) +{ + int ret = down_interruptible(&xsegbd_lock); + if (ret == 0) { + count --; + up(&xsegbd_lock); + } + return ret; +} + +static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode, + unsigned int cmd, unsigned long arg) +{ + return -ENOTTY; +} + +static const struct block_device_operations xsegbd_ops = { + .owner = THIS_MODULE, + .open = xsegbd_open, + .release = xsegbd_release, + .ioctl = xsegbd_ioctl +}; + + +/* *************************** */ +/* ** Device Initialization ** */ +/* *************************** */ + +static void xseg_request_fn(struct request_queue *rq); + +static int xsegbd_dev_init(struct xsegbd *dev, int id, sector_t size) +{ + int ret = -ENOMEM; + struct gendisk *disk; + + spin_lock_init(&dev->lock); + + dev->id = id; + dev->blk_queue = blk_alloc_queue(GFP_KERNEL); + if (!dev->blk_queue) + goto out; + + blk_init_allocated_queue(dev->blk_queue, xseg_request_fn, &dev->lock); + dev->blk_queue->queuedata = dev; + + blk_queue_flush(dev->blk_queue, REQ_FLUSH | REQ_FUA); + blk_queue_logical_block_size(dev->blk_queue, 512); + blk_queue_physical_block_size(dev->blk_queue, blksize); + blk_queue_bounce_limit(dev->blk_queue, BLK_BOUNCE_ANY); + /* we can handle any number of segments, BUT + * parts of the request may be available far sooner than others + * but we cannot complete them (unless we handle their bios directly). + */ + blk_queue_max_segments(dev->blk_queue, 1); + queue_flag_set_unlocked(QUEUE_FLAG_NONROT, dev->blk_queue); + + /* vkoukis says we don't need partitions */ + dev->gd = disk = alloc_disk(1); + if (!disk) + goto out_free_queue; + + disk->major = major; + disk->first_minor = id * XSEGBD_MINORS; + disk->fops = &xsegbd_ops; + disk->queue = dev->blk_queue; + disk->private_data = dev; + disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO; + snprintf(disk->disk_name, 32, "xsegbd%c", 'a' + id); + + ret = xsegbd_xseg_init(dev); + if (ret < 0) + goto out_free_disk; + + if (!xq_alloc_seq(&dev->blk_queue_pending, nr_requests, nr_requests)) + goto out_quit; + + dev->blk_req_pending = kmalloc(sizeof(struct request *) * nr_requests, GFP_KERNEL); + if (!dev->blk_req_pending) + goto out_free_pending; + + dev->sectors = size; + set_capacity(disk, dev->sectors); + + add_disk(disk); /* immediately activates the device */ + +out: + return ret; + +out_free_pending: + xq_free(&dev->blk_queue_pending); + +out_quit: + xsegbd_xseg_quit(dev); + +out_free_disk: + put_disk(disk); + +out_free_queue: + blk_cleanup_queue(dev->blk_queue); + + goto out; +} + +static int xsegbd_dev_destroy(struct xsegbd *dev) +{ + xq_free(&dev->blk_queue_pending); + kfree(dev->blk_req_pending); + del_gendisk(dev->gd); + put_disk(dev->gd); + blk_cleanup_queue(dev->blk_queue); + xsegbd_xseg_quit(dev); + return 0; +} + + +/* *************************** */ +/* ** Module Initialization ** */ +/* *************************** */ + +static int __init xsegbd_init(void) +{ + int ret; + + sema_init(&xsegbd_lock, 1); + + XSEGLOG("registering block device major %d", major); + ret = register_blkdev(major, XSEGBD_NAME); + if (ret < 0) { + XSEGLOG("cannot register block device!"); + ret = -EBUSY; + goto out; + } + major = ret; + XSEGLOG("registered block device major %d", major); + + XSEGLOG("initializing device"); + ret = xsegbd_dev_init(&xsegbd, 0, sector_size); + if (ret < 0) { + XSEGLOG("cannot initialize device!"); + goto unregister; + } + + XSEGLOG("initialization complete"); +out: + return ret; + +unregister: + unregister_blkdev(major, XSEGBD_NAME); + goto out; +} + +static void __exit xsegbd_exit(void) +{ + unregister_blkdev(major, XSEGBD_NAME); + + xseg_disable_driver(xsegbd.xseg, "posix"); + xseg_unregister_peer("posix"); + xseg_disable_driver(xsegbd.xseg, "xsegdev"); + xseg_unregister_peer("xsegdev"); + + xsegbd_dev_destroy(&xsegbd); + xseg_unregister_type("xsegdev"); +} + +module_init(xsegbd_init); +module_exit(xsegbd_exit); + + +/* ******************* */ +/* ** Critical Path ** */ +/* ******************* */ + +static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq, + struct request *blkreq) +{ + struct bio_vec *bvec; + struct req_iterator iter; + uint64_t off = 0; + char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment); + rq_for_each_segment(bvec, blkreq, iter) { + char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset; + memcpy(data + off, bdata, bvec->bv_len); + off += bvec->bv_len; + kunmap_atomic(bdata); + } +} + +static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq, + struct request *blkreq) +{ + struct bio_vec *bvec; + struct req_iterator iter; + uint64_t off = 0; + char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment); + rq_for_each_segment(bvec, blkreq, iter) { + char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset; + memcpy(bdata, data + off, bvec->bv_len); + off += bvec->bv_len; + kunmap_atomic(bdata); + } +} + +static void xseg_request_fn(struct request_queue *rq) +{ + struct xseg_request *xreq; + struct xsegbd *dev = rq->queuedata; + struct request *blkreq; + xqindex blkreq_idx; + char *name; + uint64_t datasize; + + for (;;) { + xreq = xseg_get_request(dev->xseg, dev->src_portno); + if (!xreq) + break; + + blkreq = blk_fetch_request(rq); + if (!blkreq) + break; + + if (blkreq->cmd_type != REQ_TYPE_FS) { + XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type); + __blk_end_request_all(blkreq, 0); + } + + datasize = blk_rq_bytes(blkreq); + BUG_ON(xreq->buffersize - dev->namesize < datasize); + BUG_ON(xseg_prep_request(xreq, dev->namesize, datasize)); + + name = XSEG_TAKE_PTR(xreq->name, dev->xseg->segment); + strncpy(name, dev->name, dev->namesize); + blkreq_idx = xq_pop_head(&dev->blk_queue_pending); + BUG_ON(blkreq_idx == None); + /* WARN_ON(dev->blk_req_pending[blkreq_idx] */ + dev->blk_req_pending[blkreq_idx] = blkreq; + xreq->priv = (void *)(unsigned long)blkreq_idx; + xreq->size = datasize; + xreq->offset = blk_rq_pos(blkreq) << 9; + /* + if (xreq->offset >= (sector_size << 9)) + XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u", + blk_rq_pos(blkreq), sector_size, + blkreq->cmd_flags & REQ_FLUSH, + blkreq->cmd_flags & REQ_FUA); + */ + + if (blkreq->cmd_flags & REQ_FLUSH) + xreq->flags |= XSEG_FLUSH; + + if (blkreq->cmd_flags & REQ_FUA) + xreq->flags |= XSEG_FUA; + + if (rq_data_dir(blkreq)) { + /* unlock for data transfers? */ + blk_to_xseg(dev->xseg, xreq, blkreq); + xreq->op = X_WRITE; + } else { + xreq->op = X_READ; + } + + BUG_ON(xseg_submit(dev->xseg, dev->dst_portno, xreq) == NoSerial); + } + + if (xreq) + xseg_put_request(dev->xseg, dev->src_portno, xreq); +} + +static long xseg_callback(void *arg) +{ + struct xsegbd *dev = arg; + struct xseg_request *xreq; + struct request *blkreq; + unsigned long flags; + xqindex blkreq_idx; + int err; + + for (;;) { + xreq = xseg_receive(dev->xseg, dev->src_portno); + if (!xreq) + break; + + /* we rely upon our peers to not have touched ->priv */ + blkreq_idx = (xqindex)(unsigned long)xreq->priv; + if (blkreq_idx < 0 || blkreq_idx >= nr_requests) { + XSEGLOG("invalid request index: %u! Ignoring.", blkreq_idx); + goto xseg_put; + } + + blkreq = dev->blk_req_pending[blkreq_idx]; + /* WARN_ON(!blkreq); */ + err = -EIO; + + if (!(xreq->state & XS_SERVED)) + goto blk_end; + + if (xreq->serviced != blk_rq_bytes(blkreq)) + goto blk_end; + + /* unlock for data transfer? */ + if (!rq_data_dir(blkreq)) + xseg_to_blk(dev->xseg, xreq, blkreq); + + err = 0; +blk_end: + blk_end_request_all(blkreq, err); + xq_append_head(&dev->blk_queue_pending, blkreq_idx); +xseg_put: + xseg_put_request(dev->xseg, xreq->portno, xreq); + } + + spin_lock_irqsave(&dev->lock, flags); + xseg_request_fn(dev->blk_queue); + spin_unlock_irqrestore(&dev->lock, flags); + return 0; +} + + diff --git a/xseg/sys/xsegbd.h b/xseg/sys/xsegbd.h new file mode 100644 index 0000000..9b57d77 --- /dev/null +++ b/xseg/sys/xsegbd.h @@ -0,0 +1,34 @@ +#ifndef _XSEGBD_REAR +#define _XSEGBD_REAR + +#define XSEGBD_NAME "xsegbd" + +#define XSEGLOG_PREFIX KERN_INFO XSEGBD_NAME ": " +#define XSEGLOG(message, args...) printk(XSEGLOG_PREFIX message "\n", ##args) + +#define XSEGBD_VOLUME_NAMELEN 32 + +#include +#include +#include +#include + +struct xsegbd { + spinlock_t lock; + struct request_queue *blk_queue; + struct gendisk *gd; + int id; + struct xsegbd_backer *backer; + sector_t sectors; + uint64_t segsize; + char name[XSEGBD_VOLUME_NAMELEN]; + uint32_t namesize; + struct xseg_config config; + struct xseg *xseg; + uint32_t src_portno, dst_portno; + struct xq blk_queue_pending; + char *_blk_queue_mem; + struct request **blk_req_pending; +}; + +#endif diff --git a/xseg/sys/xsegdev.c b/xseg/sys/xsegdev.c new file mode 100644 index 0000000..3887ceb --- /dev/null +++ b/xseg/sys/xsegdev.c @@ -0,0 +1,339 @@ +/* + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "xsegdev.h" + +static struct xsegdev xsegdev; + +int xsegdev_create_segment(struct xsegdev *dev, u64 segsize, char reserved) +{ + void *segment; + int ret = mutex_lock_interruptible(&dev->mutex); + if (ret) + goto out; + + ret = -EBUSY; + if (dev->segment) + goto out_unlock; + + /* vmalloc can handle large sizes */ + ret = -ENOMEM; + segment = vmalloc(segsize); + if (!segment) + goto out_unlock; + + dev->segsize = segsize; + dev->segment = segment; + memset(dev->segment, 0, segsize); + set_bit(XSEGDEV_READY, &dev->flags); + if (reserved) + set_bit(XSEGDEV_RESERVED, &dev->flags); + ret = 0; + +out_unlock: + mutex_unlock(&dev->mutex); +out: + return ret; +} + +EXPORT_SYMBOL(xsegdev_create_segment); + +int xsegdev_destroy_segment(struct xsegdev *dev) +{ + int ret = mutex_lock_interruptible(&dev->mutex); + if (ret) + goto out; + + /* VERIFY: + * The segment trully dies when everyone in userspace has unmapped it. + * However, the kernel mapping is immediately destroyed. + * Kernel users are notified to abort via switching of XSEGDEV_READY. + * The mapping deallocation is performed when all kernel users + * have stopped using the segment as reported by usercount. + */ + + ret = -EINVAL; + if (!dev->segment) + goto out_unlock; + + ret = -EBUSY; + if (test_bit(XSEGDEV_RESERVED, &dev->flags)) + goto out_unlock; + + clear_bit(XSEGDEV_READY, &dev->flags); + ret = wait_event_interruptible(dev->wq, atomic_read(&dev->usercount) <= 1); + if (ret) + goto out_unlock; + + vfree(dev->segment); + dev->segment = NULL; + dev->segsize = 0; + ret = 0; + +out_unlock: + mutex_unlock(&dev->mutex); + set_bit(XSEGDEV_READY, &dev->flags); +out: + return ret; +} + +EXPORT_SYMBOL(xsegdev_destroy_segment); + +struct xsegdev *xsegdev_get(int minor) +{ + struct xsegdev *dev = ERR_PTR(-ENODEV); + if (minor) + goto out; + + dev = &xsegdev; + atomic_inc(&dev->usercount); + if (!test_bit(XSEGDEV_READY, &dev->flags)) + goto fail_busy; +out: + return dev; + +fail_busy: + atomic_dec(&dev->usercount); + dev = ERR_PTR(-EBUSY); + goto out; +} + +EXPORT_SYMBOL(xsegdev_get); + +void xsegdev_put(struct xsegdev *dev) +{ + atomic_dec(&dev->usercount); + wake_up(&dev->wq); + /* ain't all this too heavy ? */ +} + +EXPORT_SYMBOL(xsegdev_put); + +/* ********************* */ +/* ** File Operations ** */ +/* ********************* */ + +struct xsegdev_file { + int minor; +}; + +static int xsegdev_open(struct inode *inode, struct file *file) +{ + struct xsegdev_file *vf = kmalloc(sizeof(struct xsegdev_file), GFP_KERNEL); + if (!vf) + return -ENOMEM; + vf->minor = 0; + file->private_data = vf; + return 0; +} + +static int xsegdev_release(struct inode *inode, struct file *file) +{ + struct xsegdev_file *vf = file->private_data; + kfree(vf); + return 0; +} + +static long xsegdev_ioctl(struct file *file, unsigned int cmd, unsigned long arg) +{ + struct xsegdev *dev; + char *seg; + long size; + int ret = -EINVAL; + + switch (cmd) { + + case XSEGDEV_IOC_CREATESEG: + dev = xsegdev_get(0); + ret = IS_ERR(dev) ? PTR_ERR(dev) : 0; + if (ret) + goto out; + + ret = xsegdev_create_segment(dev, (u64)arg, 0); + xsegdev_put(dev); + goto out; + + case XSEGDEV_IOC_DESTROYSEG: + dev = xsegdev_get(0); + ret = xsegdev_destroy_segment(&xsegdev); + xsegdev_put(dev); + goto out; + + case XSEGDEV_IOC_SEGSIZE: + dev = xsegdev_get(0); + + ret = IS_ERR(dev) ? PTR_ERR(dev) : 0; + if (ret) + goto out; + + size = dev->segsize; + seg = dev->segment; + xsegdev_put(dev); + + ret = -ENODEV; + if (!seg) + goto out; + + return size; + } + +out: + return ret; +} + +static ssize_t xsegdev_read(struct file *file, char __user *buf, + size_t count, loff_t *f_pos) +{ + return 0; +} + +static ssize_t xsegdev_write(struct file *file, const char __user *buf, + size_t count, loff_t *f_pos) +{ + struct xsegdev_file *vf = file->private_data; + struct xsegdev *dev = xsegdev_get(vf->minor); + int ret = -ENODEV; + if (!dev) + goto out; + + ret = -ENOSYS; + if (dev->callback) + ret = dev->callback(dev->callarg); + + xsegdev_put(dev); +out: + return ret; +} + +static int xsegdev_mmap(struct file *file, struct vm_area_struct *vma) +{ + struct xsegdev_file *vf = file->private_data; + struct xsegdev *dev; + size_t size = vma->vm_end - vma->vm_start; + unsigned long start = vma->vm_start, end = start + size; + char *ptr; + int ret = -ENODEV; + + dev = xsegdev_get(vf->minor); + if (IS_ERR(dev)) + goto out; + + ptr = dev->segment; + if (!ptr) + goto out_put; + + ret = -EINVAL; + + /* do not allow offset mappings, for now */ + if (vma->vm_pgoff || size > dev->segsize) + goto out_put; + + /* allow only shared, read-write mappings */ + if (!(vma->vm_flags & VM_SHARED)) + goto out_put; + + /* the segment is vmalloc() so we have to iterate through + * all pages and laboriously map them one by one. */ + for (; start < end; start += PAGE_SIZE, ptr += PAGE_SIZE) { + ret = remap_pfn_range(vma, start, vmalloc_to_pfn(ptr), + PAGE_SIZE, vma->vm_page_prot); + if (ret) + goto out_put; /* mmap syscall should clean up, right? */ + } + + ret = 0; + +out_put: + xsegdev_put(dev); +out: + return ret; +} + +static struct file_operations xsegdev_ops = +{ + .owner = THIS_MODULE, + .open = xsegdev_open, + .release = xsegdev_release, + .read = xsegdev_read, + .write = xsegdev_write, + .mmap = xsegdev_mmap, + .unlocked_ioctl = xsegdev_ioctl, +}; + + +/* *************************** */ +/* ** Module Initialization ** */ +/* *************************** */ + +static void xsegdev_init(struct xsegdev *dev, int minor) +{ + dev->minor = 0; + dev->segment = NULL; + dev->segsize = 0; + dev->flags = 0; + atomic_set(&dev->usercount, 0); + init_waitqueue_head(&dev->wq); + cdev_init(&dev->cdev, &xsegdev_ops); + mutex_init(&dev->mutex); + spin_lock_init(&dev->lock); + dev->cdev.owner = THIS_MODULE; + set_bit(XSEGDEV_READY, &dev->flags); +} + +int __init xsegdev_mod_init(void) +{ + int ret; + dev_t dev_no = MKDEV(XSEGDEV_MAJOR, 0); + ret = register_chrdev_region(dev_no, 1, "xsegdev"); + if (ret < 0) + goto out; + + xsegdev_init(&xsegdev, 0); + ret = cdev_add(&xsegdev.cdev, dev_no, 1); + if (ret < 0) + goto out_unregister; + + return ret; + +out_unregister: + unregister_chrdev_region(dev_no, 1); +out: + return ret; +} + +void __exit xsegdev_mod_exit(void) +{ + dev_t dev_no = MKDEV(XSEGDEV_MAJOR, 0); + xsegdev_destroy_segment(&xsegdev); + cdev_del(&xsegdev.cdev); + unregister_chrdev_region(dev_no, 1); +} + +module_init(xsegdev_mod_init); +module_exit(xsegdev_mod_exit); + +MODULE_DESCRIPTION("xsegdev"); +MODULE_AUTHOR("XSEG"); +MODULE_LICENSE("GPL"); + diff --git a/xseg/sys/xsegdev.h b/xseg/sys/xsegdev.h new file mode 100644 index 0000000..4fb3f7c --- /dev/null +++ b/xseg/sys/xsegdev.h @@ -0,0 +1,52 @@ +/* + */ + +#ifndef _XSEGDEV_H +#define _XSEGDEV_H + +#define XSEGDEV_MAJOR 60 + +#ifdef __KERNEL__ + +#include +#include +#include +#include + +#define XSEGDEV_READY 1 +#define XSEGDEV_RESERVED 2 + +struct xsegdev { + int minor; + u64 segsize; + char *segment; + struct cdev cdev; + unsigned long flags; + long (*callback)(void *arg); + void *callarg; + + spinlock_t lock; + struct mutex mutex; + wait_queue_head_t wq; + atomic_t usercount; +}; + +int xsegdev_create_segment(struct xsegdev *dev, u64 segsize, char reserved); +int xsegdev_destroy_segment(struct xsegdev *dev); +struct xsegdev *xsegdev_get(int minor); +void xsegdev_put(struct xsegdev *dev); + + +#endif /* __KERNEL__ */ + +#include + +#define XSEGDEV_IOC_MAGIC XSEGDEV_MAJOR +#define XSEGDEV_IOC_CREATESEG _IOR(XSEGDEV_IOC_MAGIC, 0, unsigned long) +#define XSEGDEV_IOC_DESTROYSEG _IOR(XSEGDEV_IOC_MAGIC, 1, unsigned long) +#define XSEGDEV_IOC_SEGSIZE _IOR(XSEGDEV_IOC_MAGIC, 2, unsigned long) + +#define XSEGDEV_IOC_MAXNR 2 + +#endif /* _XSEGDEV_H */ + diff --git a/xseg/sys/xsegmod.c b/xseg/sys/xsegmod.c new file mode 100644 index 0000000..ccc56d3 --- /dev/null +++ b/xseg/sys/xsegmod.c @@ -0,0 +1,58 @@ +#include +#include +#include +#include + +int (*xseg_snprintf)(char *str, size_t size, const char *format, ...) = snprintf; + +char __xseg_errbuf[4096]; + +void __load_plugin(const char *name) +{ + return; +} + +uint32_t __get_id(void) +{ + return 1; +} + +void __xseg_preinit(void) +{ + return; +} + +void __xseg_log(const char *msg) +{ + (void)printk(KERN_INFO "%s\n", msg); +} + +void *xq_malloc(unsigned long size) +{ + return kmalloc(size, GFP_KERNEL); +} + +void xq_mfree(void *ptr) +{ + return kfree(ptr); +} + +static int __init xsegmod_init(void) +{ + printk(KERN_INFO "xseg loaded"); + return 0; +} + +static void __exit xsegmod_exit(void) +{ + printk(KERN_INFO "xseg unloaded"); + return; +} + +module_init(xsegmod_init); +module_exit(xsegmod_exit); + +MODULE_DESCRIPTION("xseg"); +MODULE_AUTHOR("XSEG"); +MODULE_LICENSE("GPL"); + diff --git a/xseg/xq/.gitignore b/xseg/xq/.gitignore new file mode 100644 index 0000000..ab29919 --- /dev/null +++ b/xseg/xq/.gitignore @@ -0,0 +1,2 @@ +xq_lock_test +xq_test diff --git a/xseg/xq/Makefile b/xseg/xq/Makefile new file mode 100644 index 0000000..d3c3d53 --- /dev/null +++ b/xseg/xq/Makefile @@ -0,0 +1,27 @@ +.PHONY: all clean + +BASE=.. +DEBUG=-g + +include $(BASE)/config.mk +include $(BASE)/base.mk + +all: xq.o xq.pic.o xq_test xq_lock_test + +$(BASE)/sys/xseg_user.o: + make -C $(BASE)/sys xseg_user.o + +xq_test: xq_test.c xq.o $(BASE)/sys/xseg_user.o + $(CC) $(CFLAGS) -o $@ $< xq.o $(BASE)/sys/xseg_user.o -lrt -lm -ldl + +xq_lock_test: xq_lock_test.c xq.o $(BASE)/sys/xseg_user.o + $(CC) $(CFLAGS) -o $@ $< xq.o $(BASE)/sys/xseg_user.o -lrt -lm -ldl + +xq.o: xq.c xq.h xq_lock.h + $(CC) $(CFLAGS) -DRELATIVE_POINTERS -c -o $@ $< + +xq.pic.o: xq.c xq.h xq_lock.h + $(CC) $(CFLAGS) -DRELATIVE_POINTERS -fPIC -c -o $@ $< + +clean: + rm -f xq_lock_test xq_test xq.o xq.pic.o diff --git a/xseg/xq/xq.c b/xseg/xq/xq.c new file mode 100644 index 0000000..e0bba20 --- /dev/null +++ b/xseg/xq/xq.c @@ -0,0 +1,323 @@ +#include + +#ifdef RELATIVE_POINTERS +#define PTR(p, f) ((typeof((p)->f))((unsigned long)(p) + (unsigned long)(p)->f)) +#define PTRSET(p, f, v) ((p)->f = (void *)((unsigned long)(v) - (unsigned long)(p))) +#else +#define PTR(p, f) (p)->f +#define PTRSET(p, f, v) ((p)->f = (v)) +#endif + +static inline int __snap(xqindex size) +{ + if (!size) + return 0; + return 1 << ((sizeof(size) * 8) - __builtin_clz(size) - 1); +} + +void xq_free(struct xq *xq) { + xq_mfree((void *)PTR(xq, queue)); + memset(xq, 0, sizeof(struct xq)); +} + +void xq_init_empty(struct xq *xq, xqindex size, void *mem) +{ + xq->head = 1; + xq->tail = 0; + PTRSET(xq, queue, mem); + xq->size = __snap(size); +} + +void xq_init_map(struct xq *xq, + xqindex size, + xqindex count, + xqindex (*mapfn)(xqindex), + void *mem) +{ + xqindex t, *qmem = mem; + xq->head = count + 1; + xq->tail = 0; + PTRSET(xq, queue, qmem); + xq->size = __snap(size); + for (t = 0; t < count; t++) + qmem[t] = mapfn(t); +} + +void xq_init_seq(struct xq *xq, xqindex size, xqindex count, void *mem) +{ + xqindex t, *qmem = mem; + xq->head = count + 1; + xq->tail = 0; + PTRSET(xq, queue, qmem); + xq->size = __snap(size); + for (t = 0; t < count; t++) + qmem[t] = t; +} + +xqindex *xq_alloc_empty(struct xq *xq, xqindex size) +{ + xqindex *mem = xq_malloc(size * sizeof(xqindex)); + if (!mem) + return mem; + xq_init_empty(xq, size, mem); + return mem; +} + +xqindex *xq_alloc_map(struct xq *xq, + xqindex size, + xqindex count, + xqindex (*mapfn)(xqindex) ) +{ + xqindex *mem = xq_malloc(size * sizeof(xqindex)); + if (!mem) + return mem; + xq_init_map(xq, size, count, mapfn, mem); + return mem; +} + +xqindex *xq_alloc_seq(struct xq *xq, xqindex size, xqindex count) +{ + xqindex *mem = xq_malloc(size * sizeof(xqindex)); + if (!mem) + return mem; + xq_init_seq(xq, size, count, mem); + return mem; +} + +xqindex xq_size(struct xq *xq) +{ + return xq->size; +} + +xqindex xq_count(struct xq *xq) +{ + return xq->head - xq->tail - 1; +} + +xqindex xq_element(struct xq *xq, xqindex index) +{ + return PTR(xq, queue)[index & (xq->size - 1)]; +} + +void xq_print(struct xq *xq) +{ + xqindex i; + + LOGMSG("xq head: %lu, tail: %lu, size: %lu\n", + (unsigned long)xq->head, + (unsigned long)xq->tail, + (unsigned long)xq->size); + i = xq->tail + 1; + + for (;;) { + if (i == xq->head) + break; + LOGMSG( "%lu %lu\n", + (unsigned long)i, + (unsigned long)xq_element(xq, i) ); + i += 1; + } +} + +xqindex __xq_append_head(struct xq *xq, xqindex nr) +{ + xqindex head = xq->head; + xq->head = head + nr; + return head; +} + +xqindex xq_append_heads(struct xq *xq, + xqindex nr, + xqindex *heads) +{ + xqindex i, mask, head; + xqindex serial = xq_acquire(&xq->lock, nr); + + if (!(xq_count(xq) + nr <= xq->size)) { + serial = None; + goto out; + } + + mask = xq->size -1; + head = __xq_append_head(xq, nr); + for (i = 0; i < nr; i++) + PTR(xq, queue)[(head + i) & mask] = heads[i]; +out: + xq_release(&xq->lock); + return serial; +} + +xqindex xq_append_head(struct xq *xq, xqindex xqi) +{ + xqindex serial = xq_acquire(&xq->lock, 1); + + if (xq_count(xq) >= xq->size) { + serial = None; + goto out; + } + PTR(xq, queue)[__xq_append_head(xq, 1) & (xq->size -1)] = xqi; +out: + xq_release(&xq->lock); + return serial; +} + +xqindex __xq_pop_head(struct xq *xq, xqindex nr) +{ + xqindex head = xq->head; + xq->head = head - nr; + return head - nr; +} + +xqindex xq_pop_heads(struct xq *xq, + xqindex nr, + xqindex *heads) +{ + xqindex i, mask, head; + xqindex serial = xq_acquire(&xq->lock, nr); + + if (xq_count(xq) < nr) { + serial = None; + goto out; + } + + mask = xq->size -1; + head = __xq_pop_head(xq, nr); + for (i = 0; i < nr; i++) + heads[i] = PTR(xq, queue)[(head - i) & mask]; +out: + xq_release(&xq->lock); + return serial; +} + +xqindex xq_pop_head(struct xq *xq) +{ + xqindex value = None; + (void)xq_acquire(&xq->lock, 1); + if (!xq_count(xq)) + goto out; + value = PTR(xq, queue)[__xq_pop_head(xq, 1) & (xq->size -1)]; +out: + xq_release(&xq->lock); + return value; +} + +xqindex __xq_append_tail(struct xq *xq, xqindex nr) +{ + xqindex tail = xq->tail; + xq->tail = tail - nr; + return tail; +} + +xqindex xq_append_tails(struct xq *xq, + xqindex nr, + xqindex *tails) +{ + xqindex i, mask, tail; + xqindex serial = xq_acquire(&xq->lock, nr); + + if (!(xq_count(xq) + nr <= xq->size)) { + serial = None; + goto out; + } + + mask = xq->size -1; + tail = __xq_append_tail(xq, nr); + for (i = 0; i < nr; i++) + PTR(xq, queue)[(tail - i) & mask] = tails[i]; +out: + xq_release(&xq->lock); + return serial; +} + +xqindex xq_append_tail(struct xq *xq, xqindex xqi) +{ + xqindex serial = xq_acquire(&xq->lock, 1); + + if (!(xq_count(xq) + 1 <= xq->size)) { + serial = None; + goto out; + } + PTR(xq, queue)[__xq_append_tail(xq, 1) & (xq->size -1)] = xqi; +out: + xq_release(&xq->lock); + return serial; +} + +xqindex __xq_pop_tail(struct xq *xq, xqindex nr) +{ + xqindex tail = xq->tail; + xq->tail = tail + nr; + return tail + 1; +} + +xqindex xq_pop_tails(struct xq *xq, xqindex nr, xqindex *tails) +{ + xqindex i, mask, tail; + xqindex serial = xq_acquire(&xq->lock, nr); + + if (xq_count(xq) < nr) { + serial = None; + goto out; + } + + mask = xq->size -1; + tail = __xq_pop_tail(xq, nr); + for (i = 0; i < nr; i++) + tails[i] = PTR(xq, queue)[(tail + i) & mask]; +out: + xq_release(&xq->lock); + return serial; +} + +xqindex xq_pop_tail(struct xq *xq) +{ + xqindex value = None; + (void)xq_acquire(&xq->lock, 1); + if (!xq_count(xq)) + goto out; + value = PTR(xq, queue)[__xq_pop_tail(xq, 1) & (xq->size -1)]; +out: + xq_release(&xq->lock); + return value; +} + +int xq_head_to_tail(struct xq *headq, struct xq *tailq, xqindex nr) +{ + xqindex head, tail, hmask, tmask, *hq, *tq, i, ret = -1; + + if (headq >= tailq) { + xq_acquire(&headq->lock, nr); + xq_acquire(&tailq->lock, nr); + } else { + xq_acquire(&tailq->lock, nr); + xq_acquire(&headq->lock, nr); + } + + if (xq_count(headq) < nr || xq_count(tailq) + nr > tailq->size) + goto out; + + hmask = headq->size -1; + tmask = tailq->size -1; + head = __xq_pop_head(headq, nr); + tail = __xq_append_tail(tailq, nr); + hq = PTR(headq, queue); + tq = PTR(tailq, queue); + + for (i = 0; i < nr; i++) + tq[(tail - i) & tmask] = hq[(head - i) & hmask]; + + ret = 0; +out: + xq_release(&headq->lock); + xq_release(&tailq->lock); + return ret; +} + +#undef PTR +#undef PTRDEF + +#ifdef __KERNEL__ +#include +#include +#endif + diff --git a/xseg/xq/xq.h b/xseg/xq/xq.h new file mode 100644 index 0000000..f93eeb6 --- /dev/null +++ b/xseg/xq/xq.h @@ -0,0 +1,68 @@ +#ifndef _XQ_H +#define _XQ_H + +typedef unsigned int xqindex; + +#define None (xqindex)-1 + +#include +#include "xq_lock.h" + +struct xq { + struct xq_lock lock; + xqindex head, tail; + xqindex *queue; + xqindex size; +}; + +xqindex * xq_alloc_empty ( struct xq * xq, + xqindex size ); + +void xq_init_empty ( struct xq * xq, + xqindex size, + void * mem ); + +xqindex * xq_alloc_map ( struct xq * xq, + xqindex size, + xqindex count, + xqindex (* mapfn ) (xqindex) ); + +void xq_init_map ( struct xq * xq, + xqindex size, + xqindex count, + xqindex (* mapfn ) (xqindex), + void * mem ); + +xqindex * xq_alloc_seq ( struct xq * xq, + xqindex size, + xqindex count ); + +void xq_init_seq ( struct xq * xq, + xqindex size, + xqindex count, + void * mem ); + +void xq_free ( struct xq * xq ); + +xqindex xq_append_head ( struct xq * xq, + xqindex xqi ); + +xqindex xq_pop_head ( struct xq * xq ); + +xqindex xq_append_tail ( struct xq * xq, + xqindex xqi ); + +xqindex xq_pop_tail ( struct xq * xq ); + +int xq_head_to_tail ( struct xq * hq, + struct xq * tq, + xqindex nr ); + +xqindex xq_size ( struct xq * xq ); + +xqindex xq_count ( struct xq * xq ); + +void xq_print ( struct xq * xq ); + +#endif + diff --git a/xseg/xq/xq_exports.h b/xseg/xq/xq_exports.h new file mode 100644 index 0000000..a477ad3 --- /dev/null +++ b/xseg/xq/xq_exports.h @@ -0,0 +1,20 @@ +EXPORT_SYMBOL(xq_alloc_empty); +EXPORT_SYMBOL(xq_alloc_map); +EXPORT_SYMBOL(xq_alloc_seq); +EXPORT_SYMBOL(xq_append_head); +EXPORT_SYMBOL(xq_append_heads); +EXPORT_SYMBOL(xq_append_tail); +EXPORT_SYMBOL(xq_append_tails); +EXPORT_SYMBOL(xq_count); +EXPORT_SYMBOL(xq_element); +EXPORT_SYMBOL(xq_free); +EXPORT_SYMBOL(xq_head_to_tail); +EXPORT_SYMBOL(xq_init_empty); +EXPORT_SYMBOL(xq_init_map); +EXPORT_SYMBOL(xq_init_seq); +EXPORT_SYMBOL(xq_pop_head); +EXPORT_SYMBOL(xq_pop_heads); +EXPORT_SYMBOL(xq_pop_tail); +EXPORT_SYMBOL(xq_pop_tails); +EXPORT_SYMBOL(xq_print); +EXPORT_SYMBOL(xq_size); diff --git a/xseg/xq/xq_lock.h b/xseg/xq/xq_lock.h new file mode 100644 index 0000000..a306e91 --- /dev/null +++ b/xseg/xq/xq_lock.h @@ -0,0 +1,36 @@ +#ifndef _XQ_LOCK_H +#define _XQ_LOCK_H + +#define MFENCE() __sync_synchronize() +#define BARRIER() __asm__ __volatile__ ("" : "memory") +#define __pause() __asm__ __volatile__ ("pause\n"); +#undef __pause +#define __pause() + +struct xq_lock { + long lock; + unsigned long serial; +}; + +static inline unsigned long xq_acquire(struct xq_lock *lock, unsigned long nr) +{ + unsigned long __serial; + for (;;) { + for (; *(volatile unsigned long *)(&lock->lock); ) + __pause(); + + if (!__sync_fetch_and_sub(&lock->lock, 1)) + break; + } + + __serial = lock->serial; + lock->serial += nr; + return __serial; +} + +static inline void xq_release(struct xq_lock *lock) +{ + lock->lock = 0L; +} + +#endif diff --git a/xseg/xq/xq_lock_test.c b/xseg/xq/xq_lock_test.c new file mode 100644 index 0000000..e0ff661 --- /dev/null +++ b/xseg/xq/xq_lock_test.c @@ -0,0 +1,133 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include + +#include "xq_lock.h" + +struct thread_data { + long loops; + struct xq_lock *lock; + long *counter; + int id; +}; + +void *race_thread(void *arg) +{ + struct thread_data *th = arg; + long loops = th->loops; + struct xq_lock *lock = th->lock; + long *counter = th->counter; + unsigned long serial = 0, oldserial = 0, total = 0, maxdiff = 0, diff = 0; + double totaldiff = 0.0; + unsigned long *diffstat; + long i; + + diffstat = calloc((int)log2(loops), sizeof(unsigned long)); + if (!diffstat) { + perror("malloc"); + return NULL; + } + + oldserial = xq_acquire(lock, 1); + xq_release(lock); + + printf("%d: starting at %lu\n", th->id, oldserial); + for (i = 0; i < loops; i++) { + //if ((i & 15) == 0) + //printf("%d: %lu\n", th->id, i); + asm volatile ("#boo"); + serial = xq_acquire(lock, 1); + asm volatile ("#bee"); + //serial = oldserial +1; + (*counter) ++; + diff = serial - oldserial; + oldserial = serial; + if (diff > maxdiff) + maxdiff = diff; + diffstat[(int)log2(diff)] ++; + if (diff > 1) { + total += 1; + totaldiff += diff; + } + xq_release(lock); + } + + xq_acquire(lock, 1); + printf("%d: serial %lu, avediff: %.0lf/%lu = %lf maxdiff: %lu\n", + th->id, serial, totaldiff, total, totaldiff/total, maxdiff); + printf("stats:\n"); + for (i = 0; i < (int)log2(loops); i++) + printf(" %012lu: %lu\n", (unsigned long)powl(2, i), diffstat[i]); + xq_release(lock); + return NULL; +} + +int error(const char *msg) { + perror(msg); + return 1; +} + +long lock_race(long nr_threads, long loops, struct xq_lock *lock, long *counter) +{ + struct thread_data *th = malloc(nr_threads * sizeof(struct thread_data)); + long t, r; + if (!th) + return error("malloc"); + + pthread_t *threads = malloc(nr_threads * sizeof(pthread_t)); + if (!threads) + return error("malloc"); + + for (t = 0; t < nr_threads; t++) { + th[t].id = t; + th[t].loops = loops; + th[t].counter = counter; + th[t].lock = lock; + } + + for (t = 0; t < nr_threads; t++) { + r = pthread_create(&threads[t], NULL, race_thread, &th[t]); + if (r) + return error("pthread_create"); + } + + for (t = 0; t < nr_threads; t++) { + pthread_join(threads[t], NULL); + } + + return nr_threads * loops - *counter; +} + +struct xq_lock lock; +long counter; + +int main(int argc, char **argv) +{ + long loops, nr_threads, r; + + if (argc < 3) { + printf("Usage: xq_lock_test \n"); + return 1; + } + + nr_threads = atoi(argv[1]); + if (nr_threads < 0) nr_threads = 2; + loops = atol(argv[2]); + if (loops < 0) loops = 1000; + + struct timeval tv0, tv1; + gettimeofday(&tv0, NULL); + r = lock_race(nr_threads, loops, &lock, &counter); + gettimeofday(&tv1, NULL); + double seconds = tv1.tv_sec + tv1.tv_usec/1000000.0 - tv0.tv_sec - tv0.tv_usec / 1000000.0; + printf("lock race complete with %ld errors in %lf seconds\n", r, seconds); + if (r) + return r; + + return 0; +} diff --git a/xseg/xq/xq_test.c b/xseg/xq/xq_test.c new file mode 100644 index 0000000..9eeda9c --- /dev/null +++ b/xseg/xq/xq_test.c @@ -0,0 +1,326 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include + +#include "xq.h" + +struct item { + long seed; + double seed_sin; + long seed_times_sin; + long seed_xor_times; +}; + +void item_calculate(struct item *item) { + item->seed_sin = sin(item->seed | 1); + item->seed_times_sin = (double)(item->seed | 1) * item->seed_sin; + item->seed_xor_times = item->seed ^ item->seed_times_sin; +} + +int item_verify(struct item *item) { + struct item t; + + t.seed = item->seed; + item_calculate(&t); + + /* + printf("seed %ld, sin: %lf, times: %ld, xor: %ld\n", + item->seed, item->seed_sin, item->seed_times_sin, item->seed_xor_times); + */ + + if (t.seed_sin != item->seed_sin || + t.seed_times_sin != item->seed_times_sin || + t.seed_xor_times != item->seed_xor_times) { + printf("seed %ld, sin: %lf, times: %ld, xor: %ld\n", + item->seed, item->seed_sin, item->seed_times_sin, item->seed_xor_times); + return 0; + } + + return 1; +} + +int basic_sanity_test(struct xq *q) { + xqindex t, r; + + //printf("append_tail 9183\n"); + r = xq_append_tail(q, 9183); + //xq_print(q); + //printf("\n"); + assert(r != None); + + //printf("pop_head 9183\n"); + r = xq_pop_head(q); + //xq_print(q); + //printf("\n"); + assert(r == 9183); + + //printf("append_head 1834\n"); + r = xq_append_head(q, 1834); + //xq_print(q); + //printf("\n"); + assert(r != None); + + //printf("pop_tail 1834\n"); + r = xq_pop_tail(q); + //xq_print(q); + //printf("\n"); + assert(r == 1834); + + //printf("append_tail 3814\n"); + xq_append_tail(q, 3814); + //xq_print(q); + //printf("\n"); + + //printf("append_head 5294\n"); + xq_append_head(q, 5294); + //xq_print(q); + //printf("\n"); + + //printf("append_tail 1983\n"); + r = xq_append_tail(q, 1983); + //xq_print(q); + //printf("\n"); + assert(r != None); + + //printf("pop_tail 1983\n"); + r = xq_pop_tail(q); + //xq_print(q); + //printf("\n"); + assert(r == 1983); + + //printf("append_head 8134\n"); + r = xq_append_head(q, 8134); + //xq_print(q); + //printf("\n"); + assert(r != None); + + //printf("pop_head 8134\n"); + r = xq_pop_head(q); + //xq_print(q); + //printf("\n"); + assert(r == 8134); + + //printf("pop_tail 3814\n"); + r = xq_pop_tail(q); + //xq_print(q); + //printf("\n"); + assert(r == 3814); + + //printf("pop_head 5294\n"); + r = xq_pop_head(q); + //xq_print(q); + //printf("\n"); + assert(r == 5294); + + //printf("pop_tail None\n"); + r = xq_pop_tail(q); + //xq_print(q); + //printf("\n"); + assert(r == None); + + //printf("pop_head None\n"); + r = xq_pop_head(q); + //xq_print(q); + //printf("\n"); + assert(r == None); + + xqindex qsize = q->size; + for (t = 0; t < qsize; t += 1) { + r = xq_append_tail(q, t); + //if (r == None) printf("None: %lu\n", (unsigned long)t); + //xq_print(q); + assert(r != None); + } + + //xq_print(q); + + for (t = qsize-1; t != None; t -= 1) { + r = xq_pop_tail(q); + assert(t == r); + //printf("%lu vs %lu\n", t, (unsigned long)xq_pop_tail(q)); + } + + return 0; +} + +struct thread_data { + long loops; + struct xq *q; + struct item *items; + struct random_data *rdata; + long size; + int id; +}; + +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +void *random_test_thread(void *arg) { + struct thread_data *th = arg; + long loops = th->loops; + struct xq *q = th->q; + struct item *items = th->items; + //struct random_data *rdata = th->rdata; + int id = th->id; + long i; + + + /* + pthread_mutex_lock(&mutex); + printf("---->\n"); + xq_print(&q[0]); + xq_print(&q[1]); + printf("<----\n"); + pthread_mutex_unlock(&mutex); + */ + + for (i = 0; i < loops; i++) { + int32_t rand; + xqindex xqi; + + if ((i & (1024*1024 -1)) == 0) { + printf("%d %ld\n", id, i); + } + + //random_r(rdata, &rand); + rand = random(); + + switch (rand & 3) { + case 0: + xqi = xq_pop_tail(&q[0]); + if (xqi == None) goto unlock; + items[xqi].seed = rand; + item_calculate(&items[xqi]); + xq_append_head(&q[1], xqi); + break; + case 1: + xqi = xq_pop_head(&q[0]); + if (xqi == None) goto unlock; + items[xqi].seed = rand; + item_calculate(&items[xqi]); + xq_append_tail(&q[1], xqi); + break; + case 2: + xqi = xq_pop_tail(&q[1]); + if (xqi == None) goto unlock; + items[xqi].seed = rand; + item_calculate(&items[xqi]); + xq_append_head(&q[0], xqi); + break; + case 3: + xqi = xq_pop_head(&q[1]); + if (xqi == None) goto unlock; + items[xqi].seed = rand; + item_calculate(&items[xqi]); + xq_append_tail(&q[0], xqi); + break; + } + unlock: + ; + } + + return NULL; +} + +int error(const char *msg) { + perror(msg); + return 1; +} + +int random_test(long seed, long nr_threads, long loops, xqindex qsize, struct xq *q) { + srandom(seed); + + struct thread_data *th = malloc(nr_threads * sizeof(struct thread_data)); + if (!th) return error("malloc"); + + long t, r; + + struct item *items = malloc(qsize * sizeof(struct item)); + if (!items) return error("malloc"); + + for (t = 0; t < qsize; t += 1) item_calculate(&items[t]); + + for (t = 0; t < qsize; t += 4) { + xq_append_tail(&q[0], t+0); + xq_append_head(&q[0], t+1); + xq_append_tail(&q[1], t+2); + xq_append_head(&q[1], t+3); + } + + pthread_t *threads = malloc(nr_threads * sizeof(pthread_t)); + if (!threads) return error("malloc"); + + //struct random_data *rdata = malloc(nr_threads * sizeof(struct random_data)); + //if (!rdata) return error("malloc"); + + for (t = 0; t < nr_threads; t++) { + th[t].id = t; + th[t].loops = loops; + th[t].size = qsize; + th[t].q = q; + //th[t].rdata = &rdata[t]; + th[t].items = items; + //srandom_r(random(), th[t].rdata); + } + + for (t = 0; t < nr_threads; t++) { + r = pthread_create(&threads[t], NULL, random_test_thread, &th[t]); + if (r) return error("pthread_create"); + } + + for (t = 0; t < nr_threads; t++) { + pthread_join(threads[t], NULL); + } + + int errors = 0; + for (t = 0; t < qsize; t++) { + if (!item_verify(&items[t])) { + errors ++; + printf("error: item %ld\n", t); + }; + } + + return errors; +} + +struct xq q[2]; + +int main(int argc, char **argv) { + int r; + + if (argc < 5) { + printf("Usage: struct xqest \n"); + return 1; + } + + long seed = atol(argv[1]); + int nr_threads = atoi(argv[2]); + long loops = atol(argv[3]); + long qsize = atol(argv[4]); + + if (nr_threads < 0) nr_threads = 2; + if (loops < 0) loops = 1000; + if (qsize < 0) qsize = 1000; + + xq_alloc_empty(&q[0], qsize); + xq_alloc_empty(&q[1], qsize); + qsize = q[0].size; + assert(q[1].size == qsize); + + r = basic_sanity_test(&q[0]); + if (r) return r; + printf("basic sanity test complete.\n"); + + struct timeval tv0, tv1; + gettimeofday(&tv0, NULL); + r = random_test(seed, nr_threads, loops, qsize, q); + gettimeofday(&tv1, NULL); + double seconds = tv1.tv_sec + tv1.tv_usec/1000000.0 - tv0.tv_sec - tv0.tv_usec / 1000000.0; + printf("random multi-thread test complete with %d errors in %lf seconds\n", r, seconds); + if (r) return r; + + return 0; +} diff --git a/xseg/xseg/.gitignore b/xseg/xseg/.gitignore new file mode 100644 index 0000000..9710b93 --- /dev/null +++ b/xseg/xseg/.gitignore @@ -0,0 +1 @@ +_initialize.c diff --git a/xseg/xseg/Makefile b/xseg/xseg/Makefile new file mode 100644 index 0000000..758eead --- /dev/null +++ b/xseg/xseg/Makefile @@ -0,0 +1,75 @@ +.PHONY: all clean drivers + +BASE=.. + +include $(BASE)/config.mk +include $(BASE)/base.mk + +MAJOR=0 +MINOR=0.1 +AR=ar + +SEGMENTS=xseg_posix +DRVDIR=$(BASE)/drivers +DRVOBJS=$(addsuffix .o, $(addprefix $(DRVDIR)/, $(SEGMENTS))) + +all: libxseg.a libxseg.so + + +COMMA=, +_initialize.c: drivers + echo "void __xseg_preinit(void) {" \ + $(addprefix "void ",$(addsuffix _init"(void);",$(SEGMENTS))) \ + $(addsuffix _init"(); ",$(SEGMENTS)) '}' > _initialize.c + +xseg.o: xseg.c xseg.h $(BASE)/xq/xq.h + $(CC) $(CFLAGS) -c -o $@ $< + +xseg.pic.o: xseg.c xseg.h _initialize.c + $(CC) $(CFLAGS) -fPIC -c -o $@ $< + +libxseg.so: libxseg.so.$(MAJOR) + ln -sf $< $@ + cp -vaf $@ ../lib + +libxseg.so.$(MAJOR): libxseg.so.$(MAJOR).$(MINOR) + ln -sf $< $@ + cp -vaf $@ ../lib + +$(BASE)/sys/xseg_user.o: + make -C $(BASE)/sys xseg_user.o + +drivers: + make -C $(DRVDIR) $(addsuffix .o, $(SEGMENTS)) + +$(BASE)/xq/xq.o: + make -C $(BASE)/xq xq.o + +$(BASE)/xq/xq.pic.o: + make -C $(BASE)/xq xq.pic.o + +$(BASE)/sys/libxseg.map: + make -C $(BASE)/sys libxseg.map + +libxseg.so.$(MAJOR).$(MINOR): xseg.pic.o $(BASE)/sys/xseg_user.o $(BASE)/sys/libxseg.map \ + $(BASE)/xq/xq.pic.o $(DRVOBJS) + $(CC) $(CFLAGS) -shared \ + -Wl,-soname=libxseg.so.$(MAJOR) \ + -o libxseg.so.$(MAJOR).$(MINOR) \ + xseg.pic.o $(BASE)/sys/xseg_user.o $(BASE)/xq/xq.pic.o \ + _initialize.o $(DRVOBJS) \ + -Wl,--version-script=$(BASE)/sys/libxseg.map \ + -ldl -lrt + cp -vaf $@ ../lib + +libxseg.a: xseg.o $(BASE)/xq/xq.o drivers _initialize.o + $(AR) r libxseg.a xseg.o $(BASE)/xq/xq.o _initialize.o $(DRVOBJS) + cp -vaf $@ ../lib + +clean: + make -C $(DRVDIR) clean + rm -f _initialize.c _initialize.o + rm -f xseg.o xseg.pic.o + rm -f libxseg.a + rm -f libxseg.so libxseg.so.$(MAJOR) libxseg.so.$(MAJOR).$(MINOR) + diff --git a/xseg/xseg/xseg.c b/xseg/xseg/xseg.c new file mode 100644 index 0000000..61607d8 --- /dev/null +++ b/xseg/xseg/xseg.c @@ -0,0 +1,875 @@ +#include +#include + +#ifndef NULL +#define NULL ((void *)0) +#endif + +#define XSEG_NR_TYPES 16 +#define XSEG_NR_PEER_TYPES 64 +#define XSEG_MIN_PAGE_SIZE 4096 + +#define __align(x, shift) (((((x) -1) >> (shift)) +1) << (shift)) + +static struct xseg_type *__types[XSEG_NR_TYPES]; +static unsigned int __nr_types; +static struct xseg_peer *__peer_types[XSEG_NR_PEER_TYPES]; +static unsigned int __nr_peer_types; +static struct xseg_peer __peer_type; + +/* assuming size_t is long */ +/* +void *memcpy(void *dest, const void *src, unsigned long n); +int strncmp(const char *s1, const char *s2, unsigned long n); +char *strncpy(char *dest, const char *src, unsigned long n); +void *memset(void *s, int c, unsigned long n); +*/ + +void __load_plugin(const char *name); +void __xseg_preinit(void); +uint32_t __get_id(void); + +static void __lock_segment(struct xseg *xseg) +{ + volatile unsigned long *flags; + flags = &xseg->shared->flags; + while (__sync_fetch_and_or(flags, XSEG_F_LOCK)); +} + +static void __unlock_segment(struct xseg *xseg) +{ + volatile unsigned long *flags; + flags = &xseg->shared->flags; + __sync_fetch_and_and(flags, ~XSEG_F_LOCK); +} + +static struct xseg_type *__find_type(const char *name, long *index) +{ + long i; + for (i = 0; (*index = i) < __nr_types; i++) + if (!strncmp(__types[i]->name, name, XSEG_TNAMESIZE)) + return __types[i]; + return NULL; +} + +static struct xseg_peer *__find_peer_type(const char *name, long *index) +{ + long i; + for (i = 0; (*index = i) < __nr_peer_types; i++) { + if (!strncmp(__peer_types[i]->name, name, XSEG_TNAMESIZE)) + return __peer_types[i]; + } + return NULL; +} + +void xseg_report_peer_types(void) +{ + long i; + LOGMSG("total %u peer types:\n", __nr_peer_types); + for (i = 0; i < __nr_peer_types; i++) + LOGMSG("%ld: '%s'\n", i, __peer_types[i]->name); +} + +static struct xseg_type *__find_or_load_type(const char *name) +{ + long i; + struct xseg_type *type = __find_type(name, &i); + if (type) + return type; + + __load_plugin(name); + return __find_type(name, &i); +} + +static struct xseg_peer *__find_or_load_peer_type(const char *name) +{ + long i; + struct xseg_peer *peer_type = __find_peer_type(name, &i); + if (peer_type) + return peer_type; + + __load_plugin(name); + return __find_peer_type(name, &i); +} + +static struct xseg_peer *__get_peer_type(struct xseg *xseg, uint32_t serial) +{ + char *name; + struct xseg_peer *type; + + if (serial >= xseg->max_peer_types) + return NULL; + + type = xseg->peer_types[serial]; + if (type) + return type; + + + if (serial >= (1 << xseg->config.page_shift) / XSEG_TNAMESIZE) + return NULL; + + name = xseg->shared->peer_types[serial]; + if (!*name) + return NULL; + + type = __find_or_load_peer_type(name); + xseg->peer_types[serial] = type; + return type; +} + +static inline int __validate_port(struct xseg *xseg, uint32_t portno) +{ + return portno < xseg->config.nr_ports; +} + +/* type:name:nr_ports:nr_requests:request_size:extra_size:page_shift */ + +#define TOK(s, sp, def) \ + (s) = (sp); \ + for (;;) { \ + switch (*(sp)) { \ + case 0: \ + s = (def); \ + break; \ + case ':': \ + *(sp)++ = 0; \ + break; \ + default: \ + (sp) ++; \ + continue; \ + } \ + break; \ + } \ + +static unsigned long strul(char *s) +{ + unsigned long n = 0; + for (;;) { + unsigned char c = *s - '0'; + if (c >= 10) + break; + n = n * 10 + c; + s ++; + } + return n; +} + +/* +static char *strncopy(char *dest, const char *src, uint32_t n) +{ + uint32_t i; + char c; + for (i = 0; i < n; i++) { + c = src[i]; + dest[i] = c; + if (!c) + break; + } + dest[n-1] = 0; + return dest; +} +*/ + +int xseg_parse_spec(char *segspec, struct xseg_config *config) +{ + /* default: "posix:globalxseg:4:512:64:1024:12" */ + char *s = segspec, *sp = segspec; + + /* type */ + TOK(s, sp, "posix"); + strncpy(config->type, s, XSEG_TNAMESIZE); + config->type[XSEG_TNAMESIZE-1] = 0; + + /* name */ + TOK(s, sp, "globalxseg"); + strncpy(config->name, s, XSEG_NAMESIZE); + config->name[XSEG_NAMESIZE-1] = 0; + + /* nr_ports */ + TOK(s, sp, "4"); + config->nr_ports = strul(s); + + /* nr_requests */ + TOK(s, sp, "512"); + config->nr_requests = strul(s); + + /* request_size */ + TOK(s, sp, "64"); + config->request_size = strul(s); + + /* extra_size */ + TOK(s, sp, "128"); + config->extra_size = strul(s); + + /* page_shift */ + TOK(s, sp, "12"); + config->page_shift = strul(s); + return 0; +} + +int xseg_register_type(struct xseg_type *type) +{ + long i; + struct xseg_type *__type = __find_type(type->name, &i); + if (__type) { + LOGMSG("type %s already exists\n", type->name); + return -1; + } + + if (__nr_types >= XSEG_NR_TYPES) { + LOGMSG("maximum type registrations reached: %u\n", __nr_types); + return -2; + } + + type->name[XSEG_TNAMESIZE-1] = 0; + __types[__nr_types] = type; + __nr_types += 1; + + return 0; +} + +int xseg_unregister_type(const char *name) +{ + long i; + struct xseg_type *__type = __find_type(name, &i); + if (!__type) { + LOGMSG("segment type '%s' does not exist\n", name); + return -1; + } + + __nr_types -= 1; + __types[i] = __types[__nr_types]; + __types[__nr_types] = NULL; + return 0; +} + +int xseg_register_peer(struct xseg_peer *peer_type) +{ + long i; + struct xseg_peer *type = __find_peer_type(peer_type->name, &i); + if (type) { + LOGMSG("peer type '%s' already exists\n", type->name); + return -1; + } + + if (__nr_peer_types >= XSEG_NR_PEER_TYPES) { + LOGMSG("maximum peer type registrations reached: %u", + __nr_peer_types); + return -2; + } + + if (peer_type->peer_ops.signal_init()) { + LOGMSG("peer type '%s': signal initialization failed\n", + peer_type->name); + return -3; + } + peer_type->name[XSEG_TNAMESIZE-1] = 0; + __peer_types[__nr_peer_types] = peer_type; + __nr_peer_types += 1; + return 0; +} + +int xseg_unregister_peer(const char *name) +{ + long i; + struct xseg_peer *__type = __find_peer_type(name, &i); + if (!__type) { + LOGMSG("peer type '%s' does not exist\n", name); + return -1; + } + + __nr_peer_types -= 1; + __peer_types[i] = __peer_types[__nr_peer_types]; + __peer_types[__nr_peer_types] = NULL; + __type->peer_ops.signal_quit(); + + return 0; +} + +long __enable_driver(struct xseg *xseg, struct xseg_peer *driver) +{ + long r; + char (*drivers)[XSEG_TNAMESIZE]; + uint32_t max_drivers = xseg->max_peer_types; + + if (xseg->shared->nr_peer_types >= max_drivers) { + LOGMSG("cannot register '%s': driver namespace full\n", + driver->name); + return -1; + } + + drivers = XSEG_TAKE_PTR(xseg->shared->peer_types, xseg->segment); + for (r = 0; r < max_drivers; r++) { + if (!*drivers[r]) + goto bind; + if (!strncmp(drivers[r], driver->name, XSEG_TNAMESIZE)) + goto success; + } + + /* Unreachable */ + return -666; + +bind: + /* assert(xseg->shared->nr_peer_types == r); */ + xseg->shared->nr_peer_types = r + 1; + strncpy(drivers[r], driver->name, XSEG_TNAMESIZE); + drivers[r][XSEG_TNAMESIZE-1] = 0; + +success: + xseg->peer_types[r] = driver; + return 0; +} + +long xseg_enable_driver(struct xseg *xseg, const char *name) +{ + long r; + struct xseg_peer *driver = __find_peer_type(name, &r); + + if (!driver) { + LOGMSG("driver '%s' not found\n", name); + return -1; + } + + __lock_segment(xseg); + r = __enable_driver(xseg, driver); + __unlock_segment(xseg); + return r; +} + +int xseg_disable_driver(struct xseg *xseg, const char *name) +{ + long i; + struct xseg_peer *driver = __find_peer_type(name, &i); + if (!driver) { + LOGMSG("driver '%s' not found\n", name); + return -1; + } + + for (i = 0; i < xseg->max_peer_types; i++) + if (xseg->peer_types[i] == driver) + xseg->peer_types[i] = NULL; + return 0; +} + +/* NOTE: calculate_segment_size() and initialize_segment() + * must always be exactly in sync! +*/ + +static uint64_t calculate_segment_size(struct xseg_config *config) +{ + uint64_t size = 0; + uint32_t page_size, page_shift = config->page_shift; + + /* assert(sizeof(struct xseg) <= (1 << 9)); */ + + if (page_shift < 9) { + LOGMSG("page_shift must be >= %d\n", 9); + return 0; + } + + page_size = 1 << page_shift; + + /* struct xseg itself */ + size += page_size; + /* free requests queue struct */ + size += page_size; + + size += config->nr_requests * sizeof(struct xseg_request); + size = __align(size, page_shift); + + size += config->nr_requests * config->request_size * page_size; + + size += config->nr_ports * sizeof(struct xseg_port); + size = __align(size, page_shift); + + /* queue entries for 3 xqueues per port... */ + size += config->nr_ports * 3 * config->nr_requests * sizeof(xqindex); + size = __align(size, page_shift); + + /* ...and one global free queue */ + size += config->nr_requests * sizeof(xqindex); + size = __align(size, page_shift); + + size += config->extra_size; + size = __align(size, page_shift); + + size += sizeof(struct xseg_shared); + size = __align(size, page_shift); + + /* page for type names */ + size += page_size; + + return size; +} + +static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg) +{ + uint32_t page_shift = cfg->page_shift, page_size = 1 << page_shift; + struct xseg_shared *shared; + char *segment = (char *)xseg; + struct xq *q; + void *qmem; + uint64_t bodysize, size = page_size, i, nr_requests = cfg->nr_requests; + + if (page_size < XSEG_MIN_PAGE_SIZE) + return -1; + + xseg->free_requests = XSEG_MAKE_PTR(segment + size, segment); + size += page_size; + + xseg->requests = XSEG_MAKE_PTR(segment + size, segment); + size += nr_requests * sizeof(struct xseg_request); + size = __align(size, page_shift); + + xseg->buffers = XSEG_MAKE_PTR(segment + size, segment); + size += nr_requests * cfg->request_size * page_size; + + for (i = 0; i < nr_requests; i++) { + struct xseg_request *req = XSEG_TAKE_PTR(&xseg->requests[i], segment); + /* xseg_allocate() zeroes the segment out */ + req->buffer = xseg->buffers + i * cfg->request_size * page_size; + req->buffersize = cfg->request_size * page_size; + req->data = req->buffer; + req->datasize = req->buffersize; + } + + xseg->ports = XSEG_MAKE_PTR(segment + size, segment); + size += cfg->nr_ports * sizeof(struct xseg_port); + bodysize = nr_requests * sizeof(xqindex); + for (i = 0; i < cfg->nr_ports; i++) { + struct xseg_port *port = XSEG_TAKE_PTR(&xseg->ports[i], segment); + + q = &port->free_queue; + qmem = segment + size; + xq_init_empty(q, nr_requests, qmem); + size += bodysize; + + q = &port->request_queue; + qmem = segment + size; + xq_init_empty(q, nr_requests, qmem); + size += bodysize; + + q = &port->reply_queue; + qmem = segment + size; + xq_init_empty(q, nr_requests, qmem); + size += bodysize; + } + size = __align(size, page_shift); + + q = XSEG_TAKE_PTR(xseg->free_requests, segment); + qmem = segment + size; + xq_init_seq(q, nr_requests, nr_requests, qmem); + size += bodysize; + size = __align(size, page_shift); + + xseg->extra = XSEG_MAKE_PTR(segment + size, segment); + size += cfg->extra_size; + size = __align(size, page_shift); + + shared = (struct xseg_shared *)(segment + size); + xseg->shared = XSEG_MAKE_PTR(shared, segment); + shared->flags = 0; + shared->nr_peer_types = 0; + size += sizeof(struct xseg_shared); + size = __align(size, page_shift); + + shared->peer_types = XSEG_MAKE_PTR(segment + size, segment); + size += page_size; + + xseg->segment_size = size; + memcpy(&xseg->config, cfg, sizeof(struct xseg_config)); + return 0; +} + +int xseg_create(struct xseg_config *cfg) +{ + struct xseg *xseg = NULL; + struct xseg_type *type; + struct xseg_operations *xops; + uint64_t size; + long r; + + type = __find_or_load_type(cfg->type); + if (!type) { + cfg->type[XSEG_TNAMESIZE-1] = 0; + LOGMSG("type '%s' does not exist\n", cfg->type); + goto out_err; + } + + size = calculate_segment_size(cfg); + if (!size) { + LOGMSG("invalid config!\n"); + goto out_err; + } + + xops = &type->ops; + cfg->name[XSEG_NAMESIZE-1] = 0; + r = xops->allocate(cfg->name, size); + if (r) { + LOGMSG("cannot allocate segment!\n"); + goto out_err; + } + + xseg = xops->map(cfg->name, size); + if (!xseg) { + LOGMSG("cannot map segment!\n"); + goto out_deallocate; + } + + r = initialize_segment(xseg, cfg); + xops->unmap(xseg, size); + if (r) { + LOGMSG("cannot initilize segment!\n"); + goto out_deallocate; + } + + return 0; + +out_deallocate: + xops->deallocate(cfg->name); +out_err: + return -1; +} + +void xseg_destroy(struct xseg *xseg) +{ + struct xseg_type *type = __find_or_load_type(xseg->config.type); + if (!type) { + LOGMSG("no segment type '%s'\n", xseg->config.type); + return; + } + + /* should destroy() leave() first? */ + type->ops.deallocate(xseg->config.name); +} + +static int pointer_ok( unsigned long ptr, + unsigned long base, + uint64_t size, + char *name) +{ + int ret = !(ptr >= base && ptr < base + size); + if (ret) + LOGMSG("invalid pointer '->%s' [%llx on %llx]!\n", + (unsigned long long)ptr, + (unsigned long long)base, + name); + return ret; +} + +#define POINTER_OK(xseg, field, base) \ + pointer_ok( (unsigned long)((xseg)->field), \ + (unsigned long)(base), \ + (xseg)->segment_size, \ + #field) + +static int xseg_validate_pointers(struct xseg *xseg) +{ + int r = 0; + r += POINTER_OK(xseg, requests, xseg->segment); + r += POINTER_OK(xseg, free_requests, xseg->segment); + r += POINTER_OK(xseg, ports, xseg->segment); + r += POINTER_OK(xseg, buffers, xseg->segment); + r += POINTER_OK(xseg, extra, xseg->segment); + r += POINTER_OK(xseg, shared, xseg->segment); + return r; +} + +struct xseg *xseg_join(char *typename, char *name) +{ + struct xseg *xseg, *__xseg; + uint64_t size; + struct xseg_type *type; + struct xseg_operations *xops; + int r; + + type = __find_or_load_type(typename); + if (!type) { + LOGMSG("no segment type '%s'\n", typename); + goto err; + } + + xops = &type->ops; + + xseg = xops->malloc(sizeof(struct xseg)); + if (!xseg) { + LOGMSG("cannot allocate memory\n"); + goto err; + } + + __xseg = xops->map(name, XSEG_MIN_PAGE_SIZE); + if (!__xseg) + goto err_seg; + + size = __xseg->segment_size; + LOGMSG("size: %lu\n", (unsigned long)size); + xops->unmap(__xseg, XSEG_MIN_PAGE_SIZE); + + __xseg = xops->map(name, size); + if (!__xseg) + goto err_seg; + + xseg->version = __xseg->version; + xseg->requests = XSEG_TAKE_PTR(__xseg->requests, __xseg); + xseg->free_requests = XSEG_TAKE_PTR(__xseg->free_requests, __xseg); + xseg->ports = XSEG_TAKE_PTR(__xseg->ports, __xseg); + xseg->buffers = XSEG_TAKE_PTR(__xseg->buffers, __xseg); + xseg->extra = XSEG_TAKE_PTR(__xseg->extra, __xseg); + xseg->shared = XSEG_TAKE_PTR(__xseg->shared, __xseg); + xseg->segment_size = size; + xseg->segment = __xseg; + xseg->type = *type; + xseg->config = __xseg->config; + xseg->max_peer_types = (1 << xseg->config.page_shift) / XSEG_TNAMESIZE; + xseg->peer_types = xops->malloc(sizeof(void *) * xseg->max_peer_types); + if (!xseg->peer_types) + goto err_unmap; + memset(xseg->peer_types, 0, sizeof(void *) * xseg->max_peer_types); + + r =xseg_validate_pointers(xseg); + if (r) { + LOGMSG("found %d invalid xseg pointers!\n", r); + goto err_unmap; + } + return xseg; + +err_unmap: + xops->unmap(__xseg, size); +err_seg: + xops->mfree(xseg); +err: + return NULL; +} + +/* void xseg_leave(struct xseg *xseg) { at least free malloced memory } */ + +int xseg_prepare_wait(struct xseg *xseg, uint32_t portno) +{ + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return -1; + + port = &xseg->ports[portno]; + return __peer_type.peer_ops.prepare_wait(port); +} + + +int xseg_cancel_wait(struct xseg *xseg, uint32_t portno) +{ + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return -1; + + port = &xseg->ports[portno]; + return __peer_type.peer_ops.cancel_wait(port); +} + +int xseg_wait_signal(struct xseg *xseg, uint32_t portno, uint32_t usec_timeout) +{ + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return -1; + + port = &xseg->ports[portno]; + return __peer_type.peer_ops.wait_signal(port, usec_timeout); +} + +int xseg_signal(struct xseg *xseg, uint32_t portno) +{ + struct xseg_peer *type; + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return -1; + + port = &xseg->ports[portno]; + type = __get_peer_type(xseg, port->peer_type); + if (!type) + return -1; + + return type->peer_ops.signal(port); +} + +int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr) +{ + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return -1; + + port = &xseg->ports[portno]; + return xq_head_to_tail(xseg->free_requests, &port->free_queue, nr); +} + +int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr) +{ + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return -1; + + port = &xseg->ports[portno]; + return xq_head_to_tail(&port->free_queue, xseg->free_requests, nr); +} + +struct xseg_request *xseg_get_request(struct xseg *xseg, uint32_t portno) +{ + struct xseg_request *req; + struct xseg_port *port; + xqindex xqi; + if (!__validate_port(xseg, portno)) + return NULL; + + port = &xseg->ports[portno]; + xqi = xq_pop_head(&xseg->ports[portno].free_queue); + if (xqi == None) + return NULL; + + req = &xseg->requests[xqi]; + req->portno = portno; + return req; +} + +int xseg_put_request ( struct xseg *xseg, + uint32_t portno, + struct xseg_request *xreq ) +{ + xqindex xqi = xreq - xseg->requests; + xreq->data = xreq->buffer; + xreq->datasize = xreq->buffersize; + xreq->name = NULL; + xreq->namesize = 0; + return xq_append_head(&xseg->ports[portno].free_queue, xqi) == None; +} + +int xseg_prep_request ( struct xseg_request *req, + uint32_t namesize, uint64_t datasize ) +{ + if (namesize + datasize > req->buffersize) + return -1; + + req->data = req->buffer; + req->name = req->buffer + req->buffersize - namesize; + req->datasize = datasize; + req->namesize = namesize; + return 0; +} + +xserial xseg_submit ( struct xseg *xseg, uint32_t portno, + struct xseg_request *xreq ) +{ + xserial serial = NoSerial; + xqindex xqi; + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + goto out; + + port = &xseg->ports[portno]; + xqi = xreq - xseg->requests; + serial = xq_append_tail(&port->request_queue, xqi); + /* who signals? we do or caller does? */ + +out: + return serial; +} + +struct xseg_request *xseg_receive(struct xseg *xseg, uint32_t portno) +{ + xqindex xqi; + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return NULL; + + port = &xseg->ports[portno]; + xqi = xq_pop_head(&port->reply_queue); + if (xqi == None) + return NULL; + + return xseg->requests + xqi; +} + +struct xseg_request *xseg_accept(struct xseg *xseg, uint32_t portno) +{ + xqindex xqi; + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + return NULL; + + port = &xseg->ports[portno]; + xqi = xq_pop_head(&port->request_queue); + if (xqi == None) + return NULL; + + return xseg->requests + xqi; +} + +xserial xseg_respond ( struct xseg *xseg, uint32_t portno, + struct xseg_request *xreq ) +{ + xserial serial = NoSerial; + xqindex xqi; + struct xseg_port *port; + if (!__validate_port(xseg, portno)) + goto out; + + port = &xseg->ports[portno]; + xqi = xreq - xseg->requests; + serial = xq_append_tail(&port->reply_queue, xqi); + /* who signals? we do? caller does? */ + +out: + return serial; +} + + +struct xseg_port *xseg_bind_port(struct xseg *xseg, long req) +{ + uint32_t portno, maxno, id = __get_id(), force; + struct xseg_port *port; + + if (req < 0 || req > xseg->config.nr_ports) { + portno = 0; + maxno = xseg->config.nr_ports; + force = 0; + } else { + portno = req; + maxno = req; + force = 1; + } + + __lock_segment(xseg); + for (; portno <= maxno; portno++) { + long driver; + port = &xseg->ports[portno]; + if (port->owner && !force) + continue; + driver = __enable_driver(xseg, &__peer_type); + if (driver < 0) + break; + port->peer_type = (uint32_t)driver; + port->owner = id; + goto out; + } + port = NULL; +out: + __unlock_segment(xseg); + return port; +} + + +int xseg_initialize(const char *_peer_type_name) +{ + struct xseg_peer *type; + + __xseg_preinit(); + type = __find_or_load_peer_type(_peer_type_name); + if (!type) { + LOGMSG("Cannot initialize '%s': no driver\n", _peer_type_name); + return -1; + } + __peer_type = *type; + return 0; +} + +#ifdef __KERNEL__ +#include +#include +#endif + diff --git a/xseg/xseg/xseg.h b/xseg/xseg/xseg.h new file mode 100644 index 0000000..c2bcfc9 --- /dev/null +++ b/xseg/xseg/xseg.h @@ -0,0 +1,310 @@ +#ifndef _XSEG_H +#define _XSEG_H + +#ifndef XSEG_VERSION +#define XSEG_VERSION 2011072801 +#endif + +#ifndef XSEG_PAGE_SHIFT +#define XSEG_PAGE_SHIFT 12 +#endif + +#define XSEG_BASE (0x37fd0UL << XSEG_PAGE_SHIFT) +#define XSEG_BASE_AS_PTR ((void *)XSEG_BASE) +#define XSEG_BASE_AS_BUF ((char *)XSEG_BASE) +#define XSEG_OFFSET(base, ptr) ((unsigned long)(ptr) - (unsigned long)(base)) +#define XSEG_PTR_CONVERT(ptr, src, dst) ((void *)((unsigned long)(dst) + XSEG_OFFSET(src, ptr))) +#define XSEG_TAKE_PTR(ptr, base) XSEG_PTR_CONVERT(ptr, XSEG_BASE, base) +#define XSEG_MAKE_PTR(ptr, base) XSEG_PTR_CONVERT(ptr, base, XSEG_BASE) + +/* Request Flags */ +#define XSEG_FLUSH 0 +#define XSEG_FUA 1 +#define XSEG_ALLDATA 2 +#define XSEG_FRESH 3 + +#include +#include + +typedef uint64_t xserial; +#define NoSerial ((xserial)-1) + +/* Peers and Segments + * + * Segments are memory segments shared among peers. + * Peers are local execution contexes that share a segment. + * + * xseg_type and xseg_peer + * + * A peer needs an xseg_type in order to + * create or access a certain segment type, + * and it needs an xseg_peer in order to + * communicate with a certain type of peer. + * Both segment and peer types are identified by name strings. + * + * Note that each peer (that is, context) type will need + * different code to access the same type of segment or peer. + * Therefore each peer must have its own "customized" version + * of the xseg library. + * + * This is accomplished by mechanisms for registering both + * xseg_type's and xseg_peer's. This can be done at both at build time + * and at runtime, through a plugin-loading mechanism (where applicable). + * The plugin namespace serves both segment and peer type namespace, + * so if a segment type has the same name with a peer type, + * they must be provided by the same plugin. + * + * Note also that peers of different types may share the same segment. + * Therefore each peer must know the type of each peer it needs to + * communicate with, and have a driver for it. + * +*/ + +struct xseg_port; + +struct xseg_operations { + void *(*malloc)(uint64_t size); + void *(*realloc)(void *mem, uint64_t size); + void (*mfree)(void *mem); + long (*allocate)(const char *name, uint64_t size); + long (*deallocate)(const char *name); + void *(*map)(const char *name, uint64_t size); + void (*unmap)(void *xseg, uint64_t size); +}; + +#define XSEG_NAMESIZE 256 +#define XSEG_TNAMESIZE 32 + +struct xseg_type { + struct xseg_operations ops; + char name[XSEG_TNAMESIZE]; +}; + +struct xseg_peer_operations { + int (*signal_init)(void); + void (*signal_quit)(void); + int (*prepare_wait)(struct xseg_port *port); + int (*cancel_wait)(struct xseg_port *port); + int (*wait_signal)(struct xseg_port *port, uint32_t usec_timeout); + int (*signal)(struct xseg_port *port); + void *(*malloc)(uint64_t size); + void *(*realloc)(void *mem, uint64_t size); + void (*mfree)(void *mem); +}; + +struct xseg_peer { + struct xseg_peer_operations peer_ops; + char name[XSEG_TNAMESIZE]; +}; + +struct xseg_config { + uint32_t nr_ports; /* max ports (endpoints) supported on segment */ + uint32_t nr_requests; /* how many requests to live on the segment. + requests are globally shared among ports. */ + uint32_t request_size; /* default request buffer size in pages. + users may override this temporarily */ + uint64_t extra_size; /* extra memory in pages to allocate beyond + overhead and request buffers */ + uint32_t page_shift; /* the alignment unit */ + char type[XSEG_TNAMESIZE]; /* zero-terminated identifier */ + char name[XSEG_NAMESIZE]; /* zero-terminated identifier */ +}; + +struct xseg; + +struct xseg_port { + struct xq free_queue; + struct xq request_queue; + struct xq reply_queue; + unsigned long owner; + volatile long waitcue; + uint32_t peer_type; +}; + +struct xseg_request; + +struct xseg_task { + uint64_t epoch; + struct xseg_request *req; + xqindex *deps; + xqindex nr_deps; + xqindex __alloced_deps; +}; + +/* OPS */ +#define X_PING 0 +#define X_READ 1 +#define X_WRITE 2 +#define X_SYNC 3 +#define X_TRUNCATE 4 +#define X_DELETE 5 +#define X_ACQUIRE 6 +#define X_RELEASE 7 +#define X_COPY 8 +#define X_CLONE 9 +#define X_COMMIT 10 + +/* FLAGS */ +#define XF_NOSYNC (1 << 0) +#define XF_FLUSH (1 << 1) +#define XF_FUA (1 << 2) + +/* STATES */ +#define XS_SERVED (1 << 0) +#define XS_ERROR (1 << 1) + +#define XS_ACCEPTED (1 << 2) +#define XS_PENDING (2 << 2) +#define XS_FINISHED (3 << 2) + +struct xseg_request { + xserial serial; + uint64_t offset; + uint64_t size; + uint64_t serviced; + char *data; + uint64_t datasize; + char *name; /* target name */ + uint32_t namesize; + uint32_t op; + uint32_t state; + uint32_t flags; + uint32_t portno; + /* pad */ + char *buffer; + uint64_t buffersize; + xqindex task; + void *priv; +}; + +struct xseg_shared { + unsigned long flags; + char (*peer_types)[XSEG_TNAMESIZE]; + uint32_t nr_peer_types; +}; + +struct xseg { + uint64_t version; + struct xseg_request *requests; + struct xq *free_requests; + struct xseg_port *ports; + char *buffers; + char *extra; + struct xseg_shared *shared; + uint64_t segment_size; + struct xseg *segment; + struct xseg_type type; + struct xseg_config config; + struct xseg_peer **peer_types; + uint32_t max_peer_types; +}; + +#define XSEG_F_LOCK 0x1 + +/* ================= XSEG REQUEST INTERFACE ================================= */ +/* ___________________ _________ */ +/* / \ / \ */ + int xseg_initialize ( const char * peer_type ); + + int xseg_parse_spec ( char * spec, + struct xseg_config * config ); + + struct xseg_port * xseg_bind_port ( struct xseg * xseg, + long portno ); + + static uint32_t xseg_portno ( struct xseg * xseg, + struct xseg_port * port ); +/* \___________________/ \_________/ */ +/* ___________________ _________ */ +/* / \ / \ */ + int xseg_register_type ( struct xseg_type * type ); + int xseg_unregister_type ( const char * name ); + + int xseg_register_peer ( struct xseg_peer * peer ); + int xseg_unregister_peer ( const char * name ); + + void xseg_report_peer_types( void ); + + long xseg_enable_driver ( struct xseg * xseg, + const char * name ); + int xseg_disable_driver ( struct xseg * xseg, + const char * name ); +/* \___________________/ \_________/ */ +/* ___________________ _________ */ +/* / \ / \ */ + int xseg_create ( struct xseg_config * cfg ); + + void xseg_destroy ( struct xseg * xseg ); +/* \___________________/ \_________/ */ +/* ___________________ _________ */ +/* / \ / \ */ + struct xseg * xseg_join ( char * typename, + char * name ); + + void xseg_leave ( struct xseg * xseg ); +/* \___________________/ \_________/ */ +/* ___________________ _________ */ +/* / \ / \ */ + int xseg_alloc_requests ( struct xseg * xseg, + uint32_t portno, + uint32_t nr ); + + int xseg_free_requests ( struct xseg * xseg, + uint32_t portno, + int nr ); + +struct xseg_request * xseg_get_request ( struct xseg * xseg, + uint32_t portno ); + + int xseg_put_request ( struct xseg * xseg, + uint32_t portno, + struct xseg_request * xreq ); + + int xseg_prep_request ( struct xseg_request * xreq, + uint32_t namesize, + uint64_t datasize ); +/* \___________________/ \_________/ */ +/* ___________________ _________ */ +/* / \ / \ */ + xserial xseg_submit ( struct xseg * xseg, + uint32_t portno, + struct xseg_request * xreq ); + +struct xseg_request * xseg_receive ( struct xseg * xseg, + uint32_t portno ); +/* \___________________/ \_________/ */ +/* ___________________ _________ */ +/* / \ / \ */ + +struct xseg_request * xseg_accept ( struct xseg * xseg, + uint32_t portno ); + + xserial xseg_respond ( struct xseg * xseg, + uint32_t portno, + struct xseg_request * xreq ); +/* \___________________/ \_________/ */ +/* ___________________ _________ */ +/* / \ / \ */ + int xseg_prepare_wait ( struct xseg * xseg, + uint32_t portno ); + + int xseg_cancel_wait ( struct xseg * xseg, + uint32_t portno ); + + int xseg_wait_signal ( struct xseg * xseg, + uint32_t portno, + uint32_t utimeout ); + + int xseg_signal ( struct xseg * xseg, + uint32_t portno ); +/* \___________________/ \_________/ */ +/* */ +/* ================= XSEG REQUEST INTERFACE ================================= */ + + +static inline uint32_t xseg_portno(struct xseg *xseg, struct xseg_port *port) +{ + return port - xseg->ports; +} + +#endif diff --git a/xseg/xseg/xseg_exports.h b/xseg/xseg/xseg_exports.h new file mode 100644 index 0000000..aafae25 --- /dev/null +++ b/xseg/xseg/xseg_exports.h @@ -0,0 +1,26 @@ +EXPORT_SYMBOL(xseg_initialize); +EXPORT_SYMBOL(xseg_parse_spec); +EXPORT_SYMBOL(xseg_register_type); +EXPORT_SYMBOL(xseg_unregister_type); +EXPORT_SYMBOL(xseg_register_peer); +EXPORT_SYMBOL(xseg_unregister_peer); +EXPORT_SYMBOL(xseg_report_peer_types); +EXPORT_SYMBOL(xseg_enable_driver); +EXPORT_SYMBOL(xseg_disable_driver); +EXPORT_SYMBOL(xseg_create); +EXPORT_SYMBOL(xseg_destroy); +EXPORT_SYMBOL(xseg_join); +EXPORT_SYMBOL(xseg_bind_port); +EXPORT_SYMBOL(xseg_alloc_requests); +EXPORT_SYMBOL(xseg_free_requests); +EXPORT_SYMBOL(xseg_get_request); +EXPORT_SYMBOL(xseg_put_request); +EXPORT_SYMBOL(xseg_prep_request); +EXPORT_SYMBOL(xseg_submit); +EXPORT_SYMBOL(xseg_receive); +EXPORT_SYMBOL(xseg_accept); +EXPORT_SYMBOL(xseg_respond); +EXPORT_SYMBOL(xseg_prepare_wait); +EXPORT_SYMBOL(xseg_cancel_wait); +EXPORT_SYMBOL(xseg_wait_signal); +EXPORT_SYMBOL(xseg_signal);