+/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
-#include <xseg/xseg.h>
-#include <peer.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
+
#ifdef MT
#include <pthread.h>
#endif
+#include <xseg/xseg.h>
+#include <peer.h>
+
#ifdef MT
#define PEER_TYPE "pthread"
#else
#define PEER_TYPE "posix"
#endif
+//FIXME this should not be defined here probably
+#define MAX_SPEC_LEN 128
+#define MAX_PIDFILE_LEN 512
+
volatile unsigned int terminated = 0;
unsigned int verbose = 0;
struct log_ctx lc;
#endif
#ifdef MT
+struct peerd *global_peer;
+
struct thread {
struct peerd *peer;
pthread_t tid;
pthread_cond_t cond;
pthread_mutex_t lock;
+ int thread_no;
void (*func)(void *arg);
void *arg;
};
-
inline static struct thread* alloc_thread(struct peerd *peer)
{
xqindex idx = xq_pop_head(&peer->threads, 1);
}
#endif
+/*
+ * extern is needed if this function is going to be called by another file
+ * such as bench-xseg.c
+ */
-static inline int isTerminate()
+void signal_handler(int signal)
{
-/* ta doesn't need to be taken into account, because the main loops
- * doesn't check the terminated flag if ta is not 0.
- *
-#ifdef ST_THREADS
- return (!ta & terminated);
-#else
- return terminated;
+ XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
+ terminated = 1;
+#ifdef MT
+ wake_up_next_thread(global_peer);
#endif
- */
- return terminated;
}
-void signal_handler(int signal)
+void renew_logfile(int signal)
{
- XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
- terminated = 1;
+ XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
+ renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
}
-static int setup_signals()
+static int setup_signals(struct peerd *peer)
{
int r;
struct sigaction sa;
+#ifdef MT
+ global_peer = peer;
+#endif
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sa.sa_handler = signal_handler;
r = sigaction(SIGQUIT, &sa, NULL);
if (r < 0)
return r;
+
+ sa.sa_handler = renew_logfile;
+ r = sigaction(SIGUSR1, &sa, NULL);
+ if (r < 0)
+ return r;
+
return r;
}
strncpy(data, req_data, 63);
data[63] = 0;
printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
- "src: %u, st: %u, dst: %u dt: %u\n"
+ "src: %u, transit: %u, dst: %u effective dst: %u\n"
"target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
(unsigned long)(req),
(unsigned int)req->op,
(unsigned long)req->serviced,
(unsigned int)req->state,
(unsigned int)req->src_portno,
- (unsigned int)req->src_transit_portno,
+ (unsigned int)req->transit_portno,
(unsigned int)req->dst_portno,
- (unsigned int)req->dst_transit_portno,
+ (unsigned int)req->effective_dst_portno,
(unsigned int)req->targetlen, target,
(unsigned long long)req->datalen, data);
}
#endif
}
-static void handle_accepted(struct peerd *peer, struct peer_req *pr,
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
struct xseg_request *req)
{
struct xseg_request *xreq = pr->req;
return 0;
}
-static int check_ports(struct peerd *peer)
+int check_ports(struct peerd *peer)
{
struct xseg *xseg = peer->xseg;
xport portno_start = peer->portno_start;
}
#ifdef MT
-int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
-{
- struct thread *t = alloc_thread(peer);
- if (t) {
- t->func = func;
- t->arg = arg;
- wake_up_thread(t);
- return 0;
- } else
- // we could hijack a thread
- return -1;
-}
-
static void* thread_loop(void *arg)
{
struct thread *t = (struct thread *) arg;
pid_t pid =syscall(SYS_gettid);
uint64_t loops;
uint64_t threshold=1000/(1 + portno_end - portno_start);
-
- XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
+ XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
xseg_init_local_signal(xseg, peer->portno_start);
for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
- if (t->func) {
- XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
- xseg_cancel_wait(xseg, peer->portno_start);
- t->func(t->arg);
- t->func = NULL;
- t->arg = NULL;
- continue;
- }
-
- for(loops= threshold; loops > 0; loops--) {
+ for(loops = threshold; loops > 0; loops--) {
if (loops == 1)
xseg_prepare_wait(xseg, peer->portno_start);
if (check_ports(peer))
xseg_cancel_wait(xseg, peer->portno_start);
XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
}
+ wake_up_next_thread(peer);
+ custom_peer_finalize(peer);
+ return NULL;
+}
+
+void *init_thread_loop(void *arg)
+{
+ struct thread *t = (struct thread *) arg;
+ struct peerd *peer = t->peer;
+ char *thread_id;
+ int i;
+
+ /*
+ * We need an identifier for every thread that will spin in peerd_loop.
+ * The following code is a way to create a string of this format:
+ * "Thread <num>"
+ * minus the null terminator. What we do is we create this string with
+ * snprintf and then resize it to exclude the null terminator with
+ * realloc. Finally, the result string is passed to the (void *arg) field
+ * of struct thread.
+ *
+ * Since the highest thread number can't be more than 5 digits, using 13
+ * chars should be more than enough.
+ */
+ thread_id = malloc(13 * sizeof(char));
+ snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+ for (i = 0; thread_id[i]; i++) {}
+ t->arg = (void *)realloc(thread_id, i-1);
+
+ //Start thread loop
+ (void)peer->peerd_loop(t);
+
+ wake_up_next_thread(peer);
+ custom_peer_finalize(peer);
+
return NULL;
}
uint32_t nr_threads = peer->nr_threads;
//TODO err check
for (i = 0; i < nr_threads; i++) {
+ peer->thread[i].func = NULL;
+ peer->thread[i].arg = NULL;
peer->thread[i].peer = peer;
pthread_cond_init(&peer->thread[i].cond,NULL);
pthread_mutex_init(&peer->thread[i].lock, NULL);
- pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
- peer->thread[i].func = NULL;
- peer->thread[i].arg = NULL;
+ pthread_create(&peer->thread[i].tid, NULL,
+ init_thread_loop, (void *)(peer->thread + i));
+ }
+ if (peer->interactive_func)
+ peer->interactive_func();
+ for (i = 0; i < nr_threads; i++) {
+ pthread_join(peer->thread[i].tid, NULL);
}
+
return 0;
}
#endif
-void defer_request(struct peerd *peer, struct peer_req *pr)
+int defer_request(struct peerd *peer, struct peer_req *pr)
{
- // assert canDefer(peer);
-// xseg_submit(peer->xseg, peer->defer_portno, pr->req);
-// xseg_signal(peer->xseg, peer->defer_portno);
-// free_peer_req(peer, pr);
+ int r;
+ xport p;
+ if (!canDefer(peer)){
+ XSEGLOG2(&lc, E, "Peer cannot defer requests");
+ return -1;
+ }
+ p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
+ X_ALLOC);
+ if (p == NoPort){
+ XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
+ return -1;
+ }
+ r = xseg_signal(peer->xseg, p);
+ if (r < 0) {
+ XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
+ }
+ free_peer_req(peer, pr);
+ return 0;
}
-static int peerd_loop(struct peerd *peer)
+/*
+ * generic_peerd_loop is a general-purpose port-checker loop that is
+ * suitable both for multi-threaded and single-threaded peers.
+ */
+static int generic_peerd_loop(void *arg)
{
#ifdef MT
- int i;
- if (peer->interactive_func)
- peer->interactive_func();
- for (i = 0; i < peer->nr_threads; i++) {
- pthread_join(peer->thread[i].tid, NULL);
- }
+ struct thread *t = (struct thread *) arg;
+ struct peerd *peer = t->peer;
+ char *id = t->arg;
#else
+ struct peerd *peer = (struct peerd *) arg;
+ char id[4] = {'P','e','e','r'};
+#endif
struct xseg *xseg = peer->xseg;
xport portno_start = peer->portno_start;
xport portno_end = peer->portno_end;
+ pid_t pid = syscall(SYS_gettid);
uint64_t threshold=1000/(1 + portno_end - portno_start);
- pid_t pid =syscall(SYS_gettid);
uint64_t loops;
-
- XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
+
+ XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
xseg_init_local_signal(xseg, peer->portno_start);
for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
- for(loops= threshold; loops > 0; loops--) {
- if (loops == 1)
- xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+ if (t->func) {
+ XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
+ xseg_cancel_wait(xseg, peer->portno_start);
+ t->func(t->arg);
+ t->func = NULL;
+ t->arg = NULL;
+ continue;
+ }
+#endif
+ //Heart of peerd_loop. This loop is common for everyone.
+ for(loops = threshold; loops > 0; loops--) {
if (check_ports(peer))
loops = threshold;
}
+ xseg_prepare_wait(xseg, peer->portno_start);
#ifdef ST_THREADS
if (ta){
st_sleep(0);
- } else {
-#endif
- XSEGLOG2(&lc, I, "Peer goes to sleep\n");
- xseg_wait_signal(xseg, 10000000UL);
- xseg_cancel_wait(xseg, peer->portno_start);
- XSEGLOG2(&lc, I, "Peer woke up\n");
-#ifdef ST_THREADS
+ continue;
}
#endif
+ XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
+ xseg_wait_signal(xseg, 10000000UL);
+ xseg_cancel_wait(xseg, peer->portno_start);
+ XSEGLOG2(&lc, I, "%s woke up\n", id);
}
+ return 0;
+}
+
+static int init_peerd_loop(struct peerd *peer)
+{
+ struct xseg *xseg = peer->xseg;
+
+ peer->peerd_loop(peer);
+ custom_peer_finalize(peer);
xseg_quit_local_signal(xseg, peer->portno_start);
-#endif
+
return 0;
}
}
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
- long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+ long portno_end, uint32_t nr_threads, xport defer_portno)
{
int i;
struct peerd *peer;
return NULL;
}
peer->xseg = join(spec);
- if (!peer->xseg)
+ if (!peer->xseg)
return NULL;
peer->portno_start = (xport) portno_start;
peer->portno_end= (xport) portno_end;
port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
if (!port){
- printf("cannot bind to port %ld\n", peer->portno_start);
+ printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
return NULL;
}
struct xseg_port *tmp;
tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
if (!tmp){
- printf("cannot bind to port %ld\n", p);
+ printf("cannot bind to port %u\n", (unsigned int) p);
return NULL;
}
}
peer->peer_reqs[i].retval = 0;
peer->peer_reqs[i].priv = NULL;
peer->peer_reqs[i].portno = NoPort;
+
+ //Plug default peerd_loop. This can change later on by custom_peer_init.
+ peer->peerd_loop = generic_peerd_loop;
+
#ifdef ST_THREADS
peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
#endif
int pidfile_write(int pid_fd)
{
char buf[16];
- snprintf(buf, sizeof(buf), "%d", syscall(SYS_gettid));
+ snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
buf[15] = 0;
-
+
lseek(pid_fd, 0, SEEK_SET);
- int ret = write(pid_fd, buf, 16);
+ int ret = write(pid_fd, buf, strlen(buf));
return ret;
}
int pidfile_open(char *path, pid_t *old_pid)
{
//nfs version > 3
- int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
+ int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
if (fd < 0){
- if (errno == -EEXIST)
+ if (errno == EEXIST)
pidfile_read(path, old_pid);
}
return fd;
}
+void usage(char *argv0)
+{
+ fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
+ fprintf(stderr, "General peer options:\n"
+ " Option | Default | \n"
+ " --------------------------------------------\n"
+ " -g | None | Segment spec to join\n"
+ " -sp | NoPort | Start portno to bind\n"
+ " -ep | NoPort | End portno to bind\n"
+ " -p | NoPort | Portno to bind\n"
+ " -n | 16 | Number of ops\n"
+ " -v | 0 | Verbosity level\n"
+ " -l | None | Logfile \n"
+ " -d | No | Daemonize \n"
+ " --pidfile | None | Pidfile \n"
+#ifdef MT
+ " -t | No | Number of threads \n"
+#endif
+ "\n"
+ );
+ custom_peer_usage();
+}
+
int main(int argc, char *argv[])
{
struct peerd *peer = NULL;
//parse args
- char *spec = "";
- int i, r, daemonize = 0;
+ int r;
long portno_start = -1, portno_end = -1, portno = -1;
+
//set defaults here
+ int daemonize = 0, help = 0;
uint32_t nr_ops = 16;
- uint32_t nr_threads = 16 ;
+ uint32_t nr_threads = 1;
unsigned int debug_level = 0;
- uint32_t defer_portno = NoPort;
- char *logfile = NULL;
- char *pidfile = NULL;
+ xport defer_portno = NoPort;
pid_t old_pid;
int pid_fd = -1;
+ char spec[MAX_SPEC_LEN + 1];
+ char logfile[MAX_LOGFILE_LEN + 1];
+ char pidfile[MAX_PIDFILE_LEN + 1];
+
+ logfile[0] = 0;
+ pidfile[0] = 0;
+ spec[0] = 0;
+
//capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
// -dp xseg_portno to defer blocking requests
// -l log file ?
//TODO print messages on arg parsing error
-
- for (i = 1; i < argc; i++) {
- if (!strcmp(argv[i], "-g") && i + 1 < argc) {
- spec = argv[i+1];
- i += 1;
- continue;
- }
-
- if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
- portno_start = strtoul(argv[i+1], NULL, 10);
- i += 1;
- continue;
- }
-
- if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
- portno_end = strtoul(argv[i+1], NULL, 10);
- i += 1;
- continue;
- }
-
- if (!strcmp(argv[i], "-p") && i + 1 < argc) {
- portno = strtoul(argv[i+1], NULL, 10);
- i += 1;
- continue;
- }
-
- if (!strcmp(argv[i], "-n") && i + 1 < argc) {
- nr_ops = strtoul(argv[i+1], NULL, 10);
- i += 1;
- continue;
- }
- if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
- debug_level = atoi(argv[i+1]);
- i += 1;
- continue;
- }
- if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
- nr_threads = strtoul(argv[i+1], NULL, 10);
- i += 1;
- continue;
- }
- if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
- defer_portno = strtoul(argv[i+1], NULL, 10);
- i += 1;
- continue;
- }
- if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
- logfile = argv[i+1];
- i += 1;
- continue;
- }
- if (!strcmp(argv[i], "-d")) {
- daemonize = 1;
- continue;
- }
- if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
- pidfile = argv[i+1];
- i += 1;
- continue;
- }
+ BEGIN_READ_ARGS(argc, argv);
+ READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
+ READ_ARG_ULONG("-sp", portno_start);
+ READ_ARG_ULONG("-ep", portno_end);
+ READ_ARG_ULONG("-p", portno);
+ READ_ARG_ULONG("-n", nr_ops);
+ READ_ARG_ULONG("-v", debug_level);
+#ifdef MT
+ READ_ARG_ULONG("-t", nr_threads);
+#endif
+ READ_ARG_ULONG("-dp", defer_portno);
+ READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
+ READ_ARG_BOOL("-d", daemonize);
+ READ_ARG_BOOL("-h", help);
+ READ_ARG_BOOL("--help", help);
+ READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
+ END_READ_ARGS();
+
+ if (help){
+ usage(argv[0]);
+ return 0;
+ }
+ r = init_logctx(&lc, argv[0], debug_level, logfile,
+ REDIRECT_STDOUT|REDIRECT_STDERR);
+ if (r < 0){
+ XSEGLOG("Cannot initialize logging to logfile");
+ return -1;
}
- init_logctx(&lc, argv[0], debug_level, logfile);
XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
-
- if (pidfile){
+
+ if (pidfile[0]){
pid_fd = pidfile_open(pidfile, &old_pid);
if (pid_fd < 0) {
if (old_pid) {
return -1;
}
}
-
+
if (daemonize){
if (daemon(0, 1) < 0){
XSEGLOG2(&lc, E, "Cannot daemonize");
- if (pid_fd > 0)
- pidfile_remove(pidfile, pid_fd);
- return -1;
-
+ r = -1;
+ goto out;
}
}
pidfile_write(pid_fd);
-
+
//TODO perform argument sanity checks
verbose = debug_level;
if (portno != -1) {
portno_start = portno;
portno_end = portno;
}
+ if (portno_start == -1 || portno_end == -1){
+ XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
+ usage(argv[0]);
+ r = -1;
+ goto out;
+ }
- setup_signals();
- //TODO err check
peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
- if (!peer)
- return -1;
+ if (!peer){
+ r = -1;
+ goto out;
+ }
+ setup_signals(peer);
r = custom_peer_init(peer, argc, argv);
if (r < 0)
- return -1;
-#ifdef MT
+ goto out;
+#if defined(MT)
+ //TODO err check
peerd_start_threads(peer);
-#endif
-
-#ifdef ST_THREADS
- st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
+#elif defined(ST_THREADS)
+ st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
r = st_thread_join(st, NULL);
#else
- r = peerd_loop(peer);
+ r = init_peerd_loop(peer);
#endif
+out:
if (pid_fd > 0)
pidfile_remove(pidfile, pid_fd);
return r;