+/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <xseg/xseg.h>
-#include <pthread.h>
-#include <mpeer.h>
+#include <speer.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <signal.h>
-#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
-#define LOG(level, ...) \
- do { \
- if (level <= verbose) { \
- fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
- } \
- }while (0)
-
+#ifdef ST_THREADS
+#include <st.h>
+uint32_t ta = 0;
+#endif
-unsigned int verbose = 0;
+unsigned int verbose;
struct log_ctx lc;
-struct thread {
- struct peerd *peer;
- pthread_t tid;
- pthread_cond_t cond;
- pthread_mutex_t lock;
- void (*func)(void *arg);
- void *arg;
-};
-
-
inline int canDefer(struct peerd *peer)
{
return !(peer->defer_portno == NoPort);
(unsigned long long)req->datalen, data);
}
}
+
void log_pr(char *msg, struct peer_req *pr)
{
char target[64], data[64];
xq_append_head(&peer->free_reqs, idx, 1);
}
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
- xqindex idx = xq_pop_head(&peer->threads, 1);
- if (idx == Noneidx)
- return NULL;
- return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
- xqindex idx = t - peer->thread;
- xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
- pthread_mutex_lock(&t->lock);
- pthread_cond_signal(&t->cond);
- pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
- if (t){
- __wake_up_thread(t);
- }
-}
-
-inline static int wake_up_next_thread(struct peerd *peer)
-{
- //struct thread *t = alloc_thread(peer);
- //wake_up_thread(t);
- //return t;
- return (xseg_signal(peer->xseg, peer->portno_start));
-}
-
struct timeval resp_start, resp_end, resp_accum = {0, 0};
uint64_t responds = 0;
void get_responds_stats(){
printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
- //(unsigned int)(t - peer->thread),
resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
}
if (xseg_signal(peer->xseg, p) < 0)
XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
free_peer_req(peer, pr);
- wake_up_next_thread(peer);
}
//FIXME error check
if (xseg_signal(peer->xseg, p) < 0)
XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
free_peer_req(peer, pr);
- wake_up_next_thread(peer);
-}
-
-void pending(struct peerd *peer, struct peer_req *pr)
-{
- pr->req->state = XS_PENDING;
}
static void handle_accepted(struct peerd *peer, struct peer_req *pr,
//assert xreq == req;
XSEGLOG2(&lc, D, "Handle accepted");
xreq->serviced = 0;
- //xreq->state = XS_ACCEPTED;
pr->retval = 0;
- dispatch(peer, pr, req);
+ dispatch(peer, pr, req, dispatch_accept);
}
static void handle_received(struct peerd *peer, struct peer_req *pr,
struct xseg_request *req)
{
//struct xseg_request *req = pr->req;
- //assert req->state != XS_ACCEPTED;
XSEGLOG2(&lc, D, "Handle received \n");
- dispatch(peer, pr, req);
+ dispatch(peer, pr, req, dispatch_receive);
}
+
struct timeval sub_start, sub_end, sub_accum = {0, 0};
uint64_t submits = 0;
void get_submits_stats(){
printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
- //(unsigned int)(t - peer->thread),
sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
}
uint32_t ret;
struct xseg_request *req = pr->req;
// assert req->portno == peer->portno ?
- //TODO small function with error checking
XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
if (ret < 0)
return 0;
}
-int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
+static int check_ports(struct peerd *peer)
{
- struct thread *t = alloc_thread(peer);
- if (t) {
- t->func = func;
- t->arg = arg;
- wake_up_thread(t);
- return 0;
- } else
- // we could hijack a thread
- return -1;
+ struct xseg *xseg = peer->xseg;
+ xport portno_start = peer->portno_start;
+ xport portno_end = peer->portno_end;
+ struct xseg_request *accepted, *received;
+ struct peer_req *pr;
+ xport i;
+ int r, c = 0;
+
+ for (i = portno_start; i <= portno_end; i++) {
+ accepted = NULL;
+ received = NULL;
+ pr = alloc_peer_req(peer);
+ if (pr) {
+ accepted = xseg_accept(xseg, i, X_NONBLOCK);
+ if (accepted) {
+ pr->req = accepted;
+ pr->portno = i;
+ xseg_cancel_wait(xseg, i);
+ handle_accepted(peer, pr, accepted);
+ c = 1;
+ }
+ else {
+ free_peer_req(peer, pr);
+ }
+ }
+ received = xseg_receive(xseg, i, X_NONBLOCK);
+ if (received) {
+ r = xseg_get_req_data(xseg, received, (void **) &pr);
+ if (r < 0 || !pr){
+ XSEGLOG2(&lc, W, "Received request with no pr data\n");
+ xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+ if (p == NoPort){
+ XSEGLOG2(&lc, W, "Could not respond stale request");
+ xseg_put_request(xseg, received, portno_start);
+ continue;
+ } else {
+ xseg_signal(xseg, p);
+ }
+ } else {
+ //maybe perform sanity check for pr
+ xseg_cancel_wait(xseg, i);
+ handle_received(peer, pr, received);
+ c = 1;
+ }
+ }
+ }
+
+ return c;
}
-static void* thread_loop(void *arg)
+static int peerd_loop(struct peerd *peer)
{
- struct thread *t = (struct thread *) arg;
- struct peerd *peer = t->peer;
struct xseg *xseg = peer->xseg;
xport portno_start = peer->portno_start;
xport portno_end = peer->portno_end;
- struct peer_req *pr;
- uint64_t threshold=1;
+ uint64_t threshold=1000/(1 + portno_end - portno_start);
pid_t pid =syscall(SYS_gettid);
uint64_t loops;
- struct xseg_request *accepted, *received;
- int r;
- int change;
- xport i;
-
- XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
-
- XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
+
+ XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
xseg_init_local_signal(xseg, peer->portno_start);
for (;;) {
- if (t->func) {
- XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
- xseg_cancel_wait(xseg, peer->portno_start);
- t->func(t->arg);
- t->func = NULL;
- t->arg = NULL;
- continue;
- }
-
for(loops= threshold; loops > 0; loops--) {
- do {
- if (loops == 1)
- xseg_prepare_wait(xseg, peer->portno_start);
- change = 0;
- for (i = portno_start; i <= portno_end; i++) {
- accepted = NULL;
- received = NULL;
- pr = alloc_peer_req(peer);
- if (pr) {
- accepted = xseg_accept(xseg, i, X_NONBLOCK);
- if (accepted) {
- XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
- pr->req = accepted;
- pr->portno = i;
- xseg_cancel_wait(xseg, i);
- handle_accepted(peer, pr, accepted);
- change = 1;
- }
- else {
- free_peer_req(peer, pr);
- }
- }
- received = xseg_receive(xseg, i, X_NONBLOCK);
- if (received) {
- //printf("received req id: %u\n", received - xseg->requests);
- //print_req(peer->xseg, received);
- r = xseg_get_req_data(xseg, received, (void **) &pr);
- if (r < 0 || !pr){
- //FIXME what to do here ?
- XSEGLOG2(&lc, W, "Received request with no pr data\n");
- xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
- //FIXME if fails, put req
- }
- xseg_cancel_wait(xseg, i);
- handle_received(peer, pr, received);
- change = 1;
- }
- }
- if (change)
- loops = threshold;
- }while (change);
+ if (loops == 1)
+ xseg_prepare_wait(xseg, peer->portno_start);
+ if (check_ports(peer))
+ loops = threshold;
}
- XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
- xseg_wait_signal(xseg, 10000000UL);
- xseg_cancel_wait(xseg, peer->portno_start);
- XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
+#ifdef ST_THREADS
+ if (ta){
+ st_sleep(0);
+ } else {
+#endif
+ XSEGLOG2(&lc, I, "Peer goes to sleep\n");
+ xseg_wait_signal(xseg, 10000000UL);
+ xseg_cancel_wait(xseg, peer->portno_start);
+ XSEGLOG2(&lc, I, "Peer woke up\n");
+#ifdef ST_THREADS
+ }
+#endif
}
- return NULL;
+ xseg_quit_local_signal(xseg, peer->portno_start);
+ return 0;
}
void defer_request(struct peerd *peer, struct peer_req *pr)
// free_peer_req(peer, pr);
}
-static int peerd_loop(struct peerd *peer)
-{
- if (peer->interactive_func)
- peer->interactive_func();
- for (;;) {
- pthread_join(peer->thread[0].tid, NULL);
- }
- return 0;
-}
-
static struct xseg *join(char *spec)
{
struct xseg_config config;
return xseg_join(config.type, config.name, "posix", NULL);
}
-int peerd_start_threads(struct peerd *peer)
-{
- int i;
- uint32_t nr_threads = peer->nr_threads;
- //TODO err check
- for (i = 0; i < nr_threads; i++) {
- peer->thread[i].peer = peer;
- pthread_cond_init(&peer->thread[i].cond,NULL);
- pthread_mutex_init(&peer->thread[i].lock, NULL);
- pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
- peer->thread[i].func = NULL;
- peer->thread[i].arg = NULL;
-
- }
- return 0;
-}
-
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
- long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+ long portno_end, uint32_t defer_portno)
{
int i;
struct peerd *peer;
+ struct xseg_port *port;
+
+#ifdef ST_THREADS
+ st_init();
+#endif
peer = malloc(sizeof(struct peerd));
if (!peer) {
perror("malloc");
}
peer->nr_ops = nr_ops;
peer->defer_portno = defer_portno;
- peer->nr_threads = nr_threads;
- peer->thread = calloc(nr_threads, sizeof(struct thread));
- if (!peer->thread)
- goto malloc_fail;
peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
if (!peer->peer_reqs){
malloc_fail:
if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
goto malloc_fail;
- if (!xq_alloc_empty(&peer->threads, nr_threads))
- goto malloc_fail;
if (xseg_initialize()){
printf("cannot initialize library\n");
peer->portno_start = (xport) portno_start;
peer->portno_end= (xport) portno_end;
- peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
- if (!peer->port){
+ port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+ if (!port){
printf("cannot bind to port %ld\n", peer->portno_start);
return NULL;
}
xport p;
for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
struct xseg_port *tmp;
- tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
+ tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
if (!tmp){
printf("cannot bind to port %ld\n", p);
return NULL;
peer->peer_reqs[i].req = NULL;
peer->peer_reqs[i].retval = 0;
peer->peer_reqs[i].priv = NULL;
+ peer->peer_reqs[i].portno = NoPort;
+#ifdef ST_THREADS
+ peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
+#endif
}
- peer->interactive_func = NULL;
return peer;
}
//parse args
char *spec = "";
int i, r;
- long portno_start = -1, portno_end = -1;
+ long portno_start = -1, portno_end = -1, portno = -1;;
//set defaults here
uint32_t nr_ops = 16;
- uint32_t nr_threads = 1 ;
unsigned int debug_level = 0;
uint32_t defer_portno = NoPort;
char *logfile = NULL;
- //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
+ //capture here -g spec, -n nr_ops, -p portno, -v verbose level
// -dp xseg_portno to defer blocking requests
// -l log file ?
//TODO print messages on arg parsing error
i += 1;
continue;
}
+
+ if (!strcmp(argv[i], "-p") && i + 1 < argc) {
+ portno = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
if (!strcmp(argv[i], "-n") && i + 1 < argc) {
nr_ops = strtoul(argv[i+1], NULL, 10);
}
init_logctx(&lc, argv[0], debug_level, logfile);
- XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
-
//TODO perform argument sanity checks
verbose = debug_level;
+ if (portno != -1) {
+ portno_start = portno;
+ portno_end = portno;
+ }
+
//TODO err check
- peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
+ peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno);
if (!peer)
return -1;
r = custom_peer_init(peer, argc, argv);
if (r < 0)
return -1;
- peerd_start_threads(peer);
+#ifdef ST_THREADS
+ st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
+ return st_thread_join(st, NULL);
+#else
return peerd_loop(peer);
+#endif
}