Add workq xtype implementation
authorFilippos Giannakos <philipgian@grnet.gr>
Mon, 29 Apr 2013 15:40:25 +0000 (18:40 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Wed, 5 Jun 2013 12:17:01 +0000 (15:17 +0300)
xseg/sys/Makefile
xseg/sys/kernel/Makefile
xseg/sys/user/Makefile
xseg/sys/user/xworkq/Makefile [new file with mode: 0644]
xseg/xtypes/xwork.h [new file with mode: 0644]
xseg/xtypes/xworkq.c [new file with mode: 0644]
xseg/xtypes/xworkq.h [new file with mode: 0644]
xseg/xtypes/xworkq_exports.h [new file with mode: 0644]
xseg/xtypes/xworkq_test.c [new file with mode: 0644]

index abb63b7..e3ee34a 100644 (file)
@@ -6,6 +6,7 @@ XTYPES = xq
 XTYPES += xpool
 XTYPES += xhash
 XTYPES += xheap
+XTYPES += xworkq
 XTYPES += xobj
 
 export XTYPES
index 30aea20..e6e4ef7 100644 (file)
@@ -77,6 +77,9 @@ xheap.k.c: $(BASE)/xtypes/xheap.c $(BASE)/xtypes/xheap.h
 xobj.k.c: $(BASE)/xtypes/xobj.c $(BASE)/xtypes/xobj.h
        ln -sf $< $@
 
+xworkq.k.c: $(BASE)/xtypes/xworkq.c $(BASE)/xtypes/xworkq.h
+       ln -sf $< $@
+
 xseg.k.c: $(BASE)/xseg/xseg.c $(BASE)/xseg/xseg.h
        ln -sf $< $@
 
index 02b8fd4..500f923 100644 (file)
@@ -124,6 +124,12 @@ xobj/xobj.o:
 xobj/xobj.pic.o:
        make -C xobj xobj.pic.o
 
+xworkq/xworkq.o:
+       make -C xworkq xworkq.o
+
+xworkq/xworkq.pic.o:
+       make -C xworkq xworkq.pic.o
+
 xseg_user.o: xseg_user.c
        $(CC) $(CFLAGS) $(INC) -Wall -O2 -finline-functions -fPIC -c -o $@ $<
 
diff --git a/xseg/sys/user/xworkq/Makefile b/xseg/sys/user/xworkq/Makefile
new file mode 100644 (file)
index 0000000..89cd98e
--- /dev/null
@@ -0,0 +1,73 @@
+# Copyright 2012 GRNET S.A. All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the following
+# conditions are met:
+#
+#   1. Redistributions of source code must retain the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer.
+#   2. Redistributions in binary form must reproduce the above
+#      copyright notice, this list of conditions and the following
+#      disclaimer in the documentation and/or other materials
+#      provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# The views and conclusions contained in the software and
+# documentation are those of the authors and should not be
+# interpreted as representing official policies, either expressed
+# or implied, of GRNET S.A.
+#
+
+.PHONY: default all clean install install-src
+
+include $(XSEG_HOME)/base.mk
+
+DEBUG=-g
+
+FILES="Makefile"
+#FILES+=$(shell ls *.h)
+#FILES+=$(shell ls *.c)
+
+SUBDIR:=$(subst $(XSEG_HOME),,$(CURDIR))
+
+default: all
+
+all: xworkq.o xworkq.pic.o xworkq_test
+
+$(BASE)/sys/user/xseg_user.o:
+       make -C $(BASE)/sys/user xseg_user.o
+
+xworkq_test: $(BASE)/xtypes/xworkq_test.c xworkq.o $(BASE)/sys/user/xq/xq.o $(BASE)/sys/user/xseg_user.o
+       $(CC) $(CFLAGS) $(INC) -L$(LIB) -o $@ $< xworkq.o \
+       $(BASE)/sys/user/xseg_user.o $(BASE)/sys/user/xq/xq.o \
+       -ldl -lpthread
+
+xworkq.o: $(BASE)/xtypes/xworkq.c $(BASE)/xtypes/xworkq.h  $(BASE)/xtypes/xwork.h $(BASE)/xtypes/xlock.h
+       $(CC) $(CFLAGS) $(INC) -c -o $@ $<
+
+xworkq.pic.o: $(BASE)/xtypes/xworkq.c $(BASE)/xtypes/xworkq.h $(BASE)/xtypes/xwork.h $(BASE)/xtypes/xlock.h
+       $(CC) $(CFLAGS) $(INC) -fPIC -c -o $@ $<
+
+clean:
+       rm -f xworkq.o xworkq.pic.o xworkq_test
+
+install:
+
+install-src:
+       install -d $(DESTDIR)$(srcdir)$(SUBDIR) ;
+       @for f in $(FILES) ; do \
+               install -o 0 -g 0 -m 644 -t $(DESTDIR)$(srcdir)$(SUBDIR) $$f ; \
+       done
diff --git a/xseg/xtypes/xwork.h b/xseg/xtypes/xwork.h
new file mode 100644 (file)
index 0000000..d6e0c91
--- /dev/null
@@ -0,0 +1,10 @@
+#ifndef __XWORK_H
+#define __XWORK_H
+
+struct work {
+       void *job;
+       void (*job_fn)(void *q, void *arg);
+};
+
+#endif /* __XWORK_H */
+
diff --git a/xseg/xtypes/xworkq.c b/xseg/xtypes/xworkq.c
new file mode 100644 (file)
index 0000000..8d4d016
--- /dev/null
@@ -0,0 +1,105 @@
+#include <xtypes/domain.h>
+#include <xtypes/xworkq.h>
+
+
+int xworkq_init(struct xworkq *wq, struct xlock *lock, uint32_t flags)
+{
+       wq->lock = lock;
+       wq->flags = flags;
+       xlock_release(&wq->q_lock);
+       wq->q = xtypes_malloc(sizeof(struct xq));
+       if (!wq->q)
+               return -1;
+       if (!xq_alloc_empty(wq->q, 8)){
+               xtypes_free(wq->q);
+               return -1;
+       }
+       return 0;
+}
+
+void xworkq_destroy(struct xworkq *wq)
+{
+       //what about pending works ? 
+       xq_free(wq->q);
+       xtypes_free(wq->q);
+}
+
+int __xworkq_enqueue(struct xworkq *wq, struct work *w)
+{
+       //enqueue and resize if necessary
+       xqindex r;
+       struct xq *newq;
+       xlock_acquire(&wq->q_lock, 4);
+       r = __xq_append_tail(wq->q, (xqindex)w);
+       if (r == Noneidx){
+               newq = xtypes_malloc(sizeof(struct xq));
+               if (!newq){
+                       r = Noneidx;
+                       goto out;
+               }
+               if (!xq_alloc_empty(newq, wq->q->size*2)){
+                       xtypes_free(newq);
+                       r = Noneidx;
+                       goto out;
+               }
+               if (__xq_resize(wq->q, newq) == Noneidx){
+                       xq_free(newq);
+                       xtypes_free(newq);
+                       r = Noneidx;
+                       goto out;
+               }
+               xtypes_free(wq->q);
+               wq->q = newq;
+               r = __xq_append_tail(wq->q, (xqindex)w);
+       }
+out:
+       xlock_release(&wq->q_lock);
+
+       return ((r == Noneidx)? -1 : 0);
+}
+
+int xworkq_enqueue(struct xworkq *wq, void (*job_fn)(void *q, void *arg), void *job)
+{
+       //maybe use xobj
+       struct work *work = xtypes_malloc(sizeof(struct work));
+       if (!work)
+               return -1;
+       work->job_fn = job_fn;
+       work->job = job;
+       if (__xworkq_enqueue(wq, work) < 0)
+               return -1;
+       return 0;
+}
+
+void xworkq_signal(struct xworkq *wq)
+{
+       xqindex xqi;
+       struct work *w;
+       while (xq_count(wq->q)){
+               if (wq->lock && !xlock_try_lock(wq->lock, 2))
+                       return;
+
+               xlock_acquire(&wq->q_lock, 3);
+               xqi = __xq_pop_head(wq->q);
+               xlock_release(&wq->q_lock);
+
+               while (xqi != Noneidx){
+                       w = (struct work *)xqi;
+                       w->job_fn(wq, w->job);
+                       xtypes_free(w);
+                       xlock_acquire(&wq->q_lock, 3);
+                       xqi = __xq_pop_head(wq->q);
+                       xlock_release(&wq->q_lock);
+               }
+out:
+               if (wq->lock)
+                       xlock_release(wq->lock);
+       }
+
+       return;
+}
+
+#ifdef __KERNEL__
+#include <linux/module.h>
+#include <xtypes/xworkq_exports.h>
+#endif
diff --git a/xseg/xtypes/xworkq.h b/xseg/xtypes/xworkq.h
new file mode 100644 (file)
index 0000000..4937f62
--- /dev/null
@@ -0,0 +1,21 @@
+#ifndef __X_WORK_H
+#define __X_WORK_H
+
+#include <xtypes/xq.h>
+#include <xtypes/xwork.h>
+
+struct xworkq {
+       struct xlock q_lock;
+       uint32_t flags;
+       struct xq *q;
+       struct xlock *lock;
+};
+
+int xworkq_init(struct xworkq *wq, struct xlock * lock,  uint32_t flags);
+int xworkq_enqueue(struct xworkq *wq, void (*job_fn)(void *q, void *arg), void *job);
+void xworkq_signal(struct xworkq *wq);
+void xworkq_destroy(struct xworkq *wq);
+
+
+
+#endif /* __X_WORK_H */
diff --git a/xseg/xtypes/xworkq_exports.h b/xseg/xtypes/xworkq_exports.h
new file mode 100644 (file)
index 0000000..ae954ac
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2013 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+EXPORT_SYMBOL(xworkq_init);
+EXPORT_SYMBOL(xworkq_destroy);
+EXPORT_SYMBOL(xworkq_enqueue);
+EXPORT_SYMBOL(xworkq_signal);
diff --git a/xseg/xtypes/xworkq_test.c b/xseg/xtypes/xworkq_test.c
new file mode 100644 (file)
index 0000000..61d2f9a
--- /dev/null
@@ -0,0 +1,181 @@
+#include "xworkq.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <xtypes/xlock.h>
+#include <sys/time.h>
+
+
+unsigned long sum = 0;
+struct xlock lock;
+
+void jobfn(void *q, void *arg)
+{
+       unsigned long c = (unsigned long) arg;
+       sum += c;
+}
+
+int test1(unsigned long n)
+{
+       struct xworkq wq;
+       unsigned long i;
+       xworkq_init(&wq, &lock, 0);
+       sum = 0;
+       xlock_release(&lock);
+
+       for (i = 0; i < n; i++) {
+               xworkq_enqueue(&wq, jobfn, (void *)1);
+       }
+
+       xworkq_destroy(&wq);
+
+       return ((sum == n)? 0 : -1);
+}
+
+struct thread_arg{
+       struct xworkq *wq;
+       unsigned long n;
+       unsigned long num;
+};
+
+void *thread_test(void *arg)
+{
+       struct thread_arg *targ = (struct thread_arg *)arg;
+       unsigned long n = targ->n;
+       unsigned long i;
+
+       for (i = 0; i < n; i++) {
+               xworkq_enqueue(targ->wq, jobfn, (void *)targ->num);
+       }
+
+
+       return NULL;
+}
+
+int test2(unsigned long n, unsigned long nr_threads)
+{
+       int i, r;
+       struct xworkq wq;
+       xworkq_init(&wq, &lock, 0);
+       sum = 0;
+       xlock_release(&lock);
+
+       struct thread_arg *targs = malloc(sizeof(struct thread_arg)*nr_threads * n);
+       pthread_t *threads = malloc(sizeof(pthread_t) * nr_threads);
+
+       for (i = 0; i < nr_threads; i++) {
+               targs[i].num = i+1;
+               targs[i].n = n;
+               targs[i].wq = &wq;
+       }
+       for (i = 0; i < nr_threads; i++) {
+               r = pthread_create(&threads[i], NULL, thread_test, &targs[i]);
+               if (r) {
+                       fprintf(stderr, "error pthread_create\n");
+                       return -1;
+               }
+       }
+
+       for (i = 0; i < nr_threads; i++) {
+               pthread_join(threads[i], NULL);
+       }
+
+
+
+       free(targs);
+       free(threads);
+       xworkq_destroy(&wq);
+
+       unsigned long expected_sum = 0;
+       for (i = 0; i < nr_threads; i++) {
+               expected_sum += n*(i+1);
+       }
+       return ((sum == expected_sum) ? 0 : -1);
+}
+
+int test3(unsigned long n, unsigned long nr_threads)
+{
+       int i, r;
+       struct xworkq wq;
+       xworkq_init(&wq, &lock, 0);
+       sum = 0;
+       xlock_release(&lock);
+
+       struct thread_arg *targs = malloc(sizeof(struct thread_arg)*nr_threads * n);
+       pthread_t *threads = malloc(sizeof(pthread_t) * nr_threads);
+
+       for (i = 0; i < nr_threads; i++) {
+               targs[i].num = i+1;
+               targs[i].n = n;
+               targs[i].wq = &wq;
+       }
+       for (i = 0; i < nr_threads; i++) {
+               r = pthread_create(&threads[i], NULL, thread_test, &targs[i]);
+               if (r) {
+                       fprintf(stderr, "error pthread_create\n");
+                       return -1;
+               }
+       }
+
+       for (i = 0; i < nr_threads; i++) {
+               pthread_join(threads[i], NULL);
+       }
+
+
+
+       free(targs);
+       free(threads);
+       xworkq_destroy(&wq);
+
+       unsigned long expected_sum = 0;
+       for (i = 0; i < nr_threads; i++) {
+               expected_sum += n*(i+1);
+       }
+       return ((sum == expected_sum) ? 0 : -1);
+}
+
+int main(int argc, const char *argv[])
+{
+       struct timeval start, end, tv;
+       int r;
+       int n = atoi(argv[1]);
+       int t = atoi(argv[2]);
+
+       fprintf(stderr, "Running test1\n");
+       gettimeofday(&start, NULL);
+       r = test1(n);
+       if (r < 0){
+               fprintf(stderr, "Test1: FAILED\n");
+               return -1;
+       }
+       gettimeofday(&end, NULL);
+       timersub(&end, &start, &tv);
+       fprintf(stderr, "Test1: PASSED\n");
+       fprintf(stderr, "Test time: %ds %dusec\n\n", (int)tv.tv_sec, (int)tv.tv_usec);
+
+       fprintf(stderr, "running test2\n");
+       gettimeofday(&start, NULL);
+       r = test2(n, t);
+       if (r < 0){
+               fprintf(stderr, "test2: failed\n");
+               return -1;
+       }
+       gettimeofday(&end, NULL);
+       fprintf(stderr, "test2: passed\n");
+       timersub(&end, &start, &tv);
+       fprintf(stderr, "Test time: %ds %dusec\n\n", (int)tv.tv_sec, (int)tv.tv_usec);
+
+       fprintf(stderr, "running test3\n");
+       gettimeofday(&start, NULL);
+       r = test3(n, t);
+       if (r < 0){
+               fprintf(stderr, "test3: failed\n");
+               return -1;
+       }
+       gettimeofday(&end, NULL);
+       fprintf(stderr, "test3: passed\n");
+       timersub(&end, &start, &tv);
+       fprintf(stderr, "Test time: %ds %dusec\n\n", (int)tv.tv_sec, (int)tv.tv_usec);
+
+       return 0;
+}