From: Filippos Giannakos Date: Mon, 29 Apr 2013 15:40:25 +0000 (+0300) Subject: Add workq xtype implementation X-Git-Url: https://code.grnet.gr/git/archipelago/commitdiff_plain/fb837abe4cfc273448fd33da9a13a2496f985a9a Add workq xtype implementation --- diff --git a/xseg/sys/Makefile b/xseg/sys/Makefile index abb63b7..e3ee34a 100644 --- a/xseg/sys/Makefile +++ b/xseg/sys/Makefile @@ -6,6 +6,7 @@ XTYPES = xq XTYPES += xpool XTYPES += xhash XTYPES += xheap +XTYPES += xworkq XTYPES += xobj export XTYPES diff --git a/xseg/sys/kernel/Makefile b/xseg/sys/kernel/Makefile index 30aea20..e6e4ef7 100644 --- a/xseg/sys/kernel/Makefile +++ b/xseg/sys/kernel/Makefile @@ -77,6 +77,9 @@ xheap.k.c: $(BASE)/xtypes/xheap.c $(BASE)/xtypes/xheap.h xobj.k.c: $(BASE)/xtypes/xobj.c $(BASE)/xtypes/xobj.h ln -sf $< $@ +xworkq.k.c: $(BASE)/xtypes/xworkq.c $(BASE)/xtypes/xworkq.h + ln -sf $< $@ + xseg.k.c: $(BASE)/xseg/xseg.c $(BASE)/xseg/xseg.h ln -sf $< $@ diff --git a/xseg/sys/user/Makefile b/xseg/sys/user/Makefile index 02b8fd4..500f923 100644 --- a/xseg/sys/user/Makefile +++ b/xseg/sys/user/Makefile @@ -124,6 +124,12 @@ xobj/xobj.o: xobj/xobj.pic.o: make -C xobj xobj.pic.o +xworkq/xworkq.o: + make -C xworkq xworkq.o + +xworkq/xworkq.pic.o: + make -C xworkq xworkq.pic.o + xseg_user.o: xseg_user.c $(CC) $(CFLAGS) $(INC) -Wall -O2 -finline-functions -fPIC -c -o $@ $< diff --git a/xseg/sys/user/xworkq/Makefile b/xseg/sys/user/xworkq/Makefile new file mode 100644 index 0000000..89cd98e --- /dev/null +++ b/xseg/sys/user/xworkq/Makefile @@ -0,0 +1,73 @@ +# Copyright 2012 GRNET S.A. All rights reserved. +# +# Redistribution and use in source and binary forms, with or +# without modification, are permitted provided that the following +# conditions are met: +# +# 1. Redistributions of source code must retain the above +# copyright notice, this list of conditions and the following +# disclaimer. +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials +# provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS +# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# +# The views and conclusions contained in the software and +# documentation are those of the authors and should not be +# interpreted as representing official policies, either expressed +# or implied, of GRNET S.A. +# + +.PHONY: default all clean install install-src + +include $(XSEG_HOME)/base.mk + +DEBUG=-g + +FILES="Makefile" +#FILES+=$(shell ls *.h) +#FILES+=$(shell ls *.c) + +SUBDIR:=$(subst $(XSEG_HOME),,$(CURDIR)) + +default: all + +all: xworkq.o xworkq.pic.o xworkq_test + +$(BASE)/sys/user/xseg_user.o: + make -C $(BASE)/sys/user xseg_user.o + +xworkq_test: $(BASE)/xtypes/xworkq_test.c xworkq.o $(BASE)/sys/user/xq/xq.o $(BASE)/sys/user/xseg_user.o + $(CC) $(CFLAGS) $(INC) -L$(LIB) -o $@ $< xworkq.o \ + $(BASE)/sys/user/xseg_user.o $(BASE)/sys/user/xq/xq.o \ + -ldl -lpthread + +xworkq.o: $(BASE)/xtypes/xworkq.c $(BASE)/xtypes/xworkq.h $(BASE)/xtypes/xwork.h $(BASE)/xtypes/xlock.h + $(CC) $(CFLAGS) $(INC) -c -o $@ $< + +xworkq.pic.o: $(BASE)/xtypes/xworkq.c $(BASE)/xtypes/xworkq.h $(BASE)/xtypes/xwork.h $(BASE)/xtypes/xlock.h + $(CC) $(CFLAGS) $(INC) -fPIC -c -o $@ $< + +clean: + rm -f xworkq.o xworkq.pic.o xworkq_test + +install: + +install-src: + install -d $(DESTDIR)$(srcdir)$(SUBDIR) ; + @for f in $(FILES) ; do \ + install -o 0 -g 0 -m 644 -t $(DESTDIR)$(srcdir)$(SUBDIR) $$f ; \ + done diff --git a/xseg/xtypes/xwork.h b/xseg/xtypes/xwork.h new file mode 100644 index 0000000..d6e0c91 --- /dev/null +++ b/xseg/xtypes/xwork.h @@ -0,0 +1,10 @@ +#ifndef __XWORK_H +#define __XWORK_H + +struct work { + void *job; + void (*job_fn)(void *q, void *arg); +}; + +#endif /* __XWORK_H */ + diff --git a/xseg/xtypes/xworkq.c b/xseg/xtypes/xworkq.c new file mode 100644 index 0000000..8d4d016 --- /dev/null +++ b/xseg/xtypes/xworkq.c @@ -0,0 +1,105 @@ +#include +#include + + +int xworkq_init(struct xworkq *wq, struct xlock *lock, uint32_t flags) +{ + wq->lock = lock; + wq->flags = flags; + xlock_release(&wq->q_lock); + wq->q = xtypes_malloc(sizeof(struct xq)); + if (!wq->q) + return -1; + if (!xq_alloc_empty(wq->q, 8)){ + xtypes_free(wq->q); + return -1; + } + return 0; +} + +void xworkq_destroy(struct xworkq *wq) +{ + //what about pending works ? + xq_free(wq->q); + xtypes_free(wq->q); +} + +int __xworkq_enqueue(struct xworkq *wq, struct work *w) +{ + //enqueue and resize if necessary + xqindex r; + struct xq *newq; + xlock_acquire(&wq->q_lock, 4); + r = __xq_append_tail(wq->q, (xqindex)w); + if (r == Noneidx){ + newq = xtypes_malloc(sizeof(struct xq)); + if (!newq){ + r = Noneidx; + goto out; + } + if (!xq_alloc_empty(newq, wq->q->size*2)){ + xtypes_free(newq); + r = Noneidx; + goto out; + } + if (__xq_resize(wq->q, newq) == Noneidx){ + xq_free(newq); + xtypes_free(newq); + r = Noneidx; + goto out; + } + xtypes_free(wq->q); + wq->q = newq; + r = __xq_append_tail(wq->q, (xqindex)w); + } +out: + xlock_release(&wq->q_lock); + + return ((r == Noneidx)? -1 : 0); +} + +int xworkq_enqueue(struct xworkq *wq, void (*job_fn)(void *q, void *arg), void *job) +{ + //maybe use xobj + struct work *work = xtypes_malloc(sizeof(struct work)); + if (!work) + return -1; + work->job_fn = job_fn; + work->job = job; + if (__xworkq_enqueue(wq, work) < 0) + return -1; + return 0; +} + +void xworkq_signal(struct xworkq *wq) +{ + xqindex xqi; + struct work *w; + while (xq_count(wq->q)){ + if (wq->lock && !xlock_try_lock(wq->lock, 2)) + return; + + xlock_acquire(&wq->q_lock, 3); + xqi = __xq_pop_head(wq->q); + xlock_release(&wq->q_lock); + + while (xqi != Noneidx){ + w = (struct work *)xqi; + w->job_fn(wq, w->job); + xtypes_free(w); + xlock_acquire(&wq->q_lock, 3); + xqi = __xq_pop_head(wq->q); + xlock_release(&wq->q_lock); + } +out: + if (wq->lock) + xlock_release(wq->lock); + } + + return; +} + +#ifdef __KERNEL__ +#include +#include +#endif diff --git a/xseg/xtypes/xworkq.h b/xseg/xtypes/xworkq.h new file mode 100644 index 0000000..4937f62 --- /dev/null +++ b/xseg/xtypes/xworkq.h @@ -0,0 +1,21 @@ +#ifndef __X_WORK_H +#define __X_WORK_H + +#include +#include + +struct xworkq { + struct xlock q_lock; + uint32_t flags; + struct xq *q; + struct xlock *lock; +}; + +int xworkq_init(struct xworkq *wq, struct xlock * lock, uint32_t flags); +int xworkq_enqueue(struct xworkq *wq, void (*job_fn)(void *q, void *arg), void *job); +void xworkq_signal(struct xworkq *wq); +void xworkq_destroy(struct xworkq *wq); + + + +#endif /* __X_WORK_H */ diff --git a/xseg/xtypes/xworkq_exports.h b/xseg/xtypes/xworkq_exports.h new file mode 100644 index 0000000..ae954ac --- /dev/null +++ b/xseg/xtypes/xworkq_exports.h @@ -0,0 +1,38 @@ +/* + * Copyright 2013 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +EXPORT_SYMBOL(xworkq_init); +EXPORT_SYMBOL(xworkq_destroy); +EXPORT_SYMBOL(xworkq_enqueue); +EXPORT_SYMBOL(xworkq_signal); diff --git a/xseg/xtypes/xworkq_test.c b/xseg/xtypes/xworkq_test.c new file mode 100644 index 0000000..61d2f9a --- /dev/null +++ b/xseg/xtypes/xworkq_test.c @@ -0,0 +1,181 @@ +#include "xworkq.h" +#include +#include +#include +#include +#include + + +unsigned long sum = 0; +struct xlock lock; + +void jobfn(void *q, void *arg) +{ + unsigned long c = (unsigned long) arg; + sum += c; +} + +int test1(unsigned long n) +{ + struct xworkq wq; + unsigned long i; + xworkq_init(&wq, &lock, 0); + sum = 0; + xlock_release(&lock); + + for (i = 0; i < n; i++) { + xworkq_enqueue(&wq, jobfn, (void *)1); + } + + xworkq_destroy(&wq); + + return ((sum == n)? 0 : -1); +} + +struct thread_arg{ + struct xworkq *wq; + unsigned long n; + unsigned long num; +}; + +void *thread_test(void *arg) +{ + struct thread_arg *targ = (struct thread_arg *)arg; + unsigned long n = targ->n; + unsigned long i; + + for (i = 0; i < n; i++) { + xworkq_enqueue(targ->wq, jobfn, (void *)targ->num); + } + + + return NULL; +} + +int test2(unsigned long n, unsigned long nr_threads) +{ + int i, r; + struct xworkq wq; + xworkq_init(&wq, &lock, 0); + sum = 0; + xlock_release(&lock); + + struct thread_arg *targs = malloc(sizeof(struct thread_arg)*nr_threads * n); + pthread_t *threads = malloc(sizeof(pthread_t) * nr_threads); + + for (i = 0; i < nr_threads; i++) { + targs[i].num = i+1; + targs[i].n = n; + targs[i].wq = &wq; + } + for (i = 0; i < nr_threads; i++) { + r = pthread_create(&threads[i], NULL, thread_test, &targs[i]); + if (r) { + fprintf(stderr, "error pthread_create\n"); + return -1; + } + } + + for (i = 0; i < nr_threads; i++) { + pthread_join(threads[i], NULL); + } + + + + free(targs); + free(threads); + xworkq_destroy(&wq); + + unsigned long expected_sum = 0; + for (i = 0; i < nr_threads; i++) { + expected_sum += n*(i+1); + } + return ((sum == expected_sum) ? 0 : -1); +} + +int test3(unsigned long n, unsigned long nr_threads) +{ + int i, r; + struct xworkq wq; + xworkq_init(&wq, &lock, 0); + sum = 0; + xlock_release(&lock); + + struct thread_arg *targs = malloc(sizeof(struct thread_arg)*nr_threads * n); + pthread_t *threads = malloc(sizeof(pthread_t) * nr_threads); + + for (i = 0; i < nr_threads; i++) { + targs[i].num = i+1; + targs[i].n = n; + targs[i].wq = &wq; + } + for (i = 0; i < nr_threads; i++) { + r = pthread_create(&threads[i], NULL, thread_test, &targs[i]); + if (r) { + fprintf(stderr, "error pthread_create\n"); + return -1; + } + } + + for (i = 0; i < nr_threads; i++) { + pthread_join(threads[i], NULL); + } + + + + free(targs); + free(threads); + xworkq_destroy(&wq); + + unsigned long expected_sum = 0; + for (i = 0; i < nr_threads; i++) { + expected_sum += n*(i+1); + } + return ((sum == expected_sum) ? 0 : -1); +} + +int main(int argc, const char *argv[]) +{ + struct timeval start, end, tv; + int r; + int n = atoi(argv[1]); + int t = atoi(argv[2]); + + fprintf(stderr, "Running test1\n"); + gettimeofday(&start, NULL); + r = test1(n); + if (r < 0){ + fprintf(stderr, "Test1: FAILED\n"); + return -1; + } + gettimeofday(&end, NULL); + timersub(&end, &start, &tv); + fprintf(stderr, "Test1: PASSED\n"); + fprintf(stderr, "Test time: %ds %dusec\n\n", (int)tv.tv_sec, (int)tv.tv_usec); + + fprintf(stderr, "running test2\n"); + gettimeofday(&start, NULL); + r = test2(n, t); + if (r < 0){ + fprintf(stderr, "test2: failed\n"); + return -1; + } + gettimeofday(&end, NULL); + fprintf(stderr, "test2: passed\n"); + timersub(&end, &start, &tv); + fprintf(stderr, "Test time: %ds %dusec\n\n", (int)tv.tv_sec, (int)tv.tv_usec); + + fprintf(stderr, "running test3\n"); + gettimeofday(&start, NULL); + r = test3(n, t); + if (r < 0){ + fprintf(stderr, "test3: failed\n"); + return -1; + } + gettimeofday(&end, NULL); + fprintf(stderr, "test3: passed\n"); + timersub(&end, &start, &tv); + fprintf(stderr, "Test time: %ds %dusec\n\n", (int)tv.tv_sec, (int)tv.tv_usec); + + return 0; +}