#include <unistd.h>
#include <sys/syscall.h>
#include <sys/time.h>
+#include <sys/resource.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
+#include <sched.h>
#ifdef MT
#include <pthread.h>
#endif
#ifdef MT
#define PEER_TYPE "pthread"
+#elif defined(FD)
+#define PEER_TYPE "posixfd"
#else
#define PEER_TYPE "posix"
#endif
//FIXME this should not be defined here probably
#define MAX_SPEC_LEN 128
#define MAX_PIDFILE_LEN 512
+#define MAX_CPUS_LEN 512
+/* Define the cpus on which the threads/process will be pinned */
+struct cpu_list {
+ int cpus[48];
+ int len;
+};
+
+struct cpu_list cpu_list;
volatile unsigned int terminated = 0;
unsigned int verbose = 0;
struct log_ctx lc;
#endif
}
+/*
+ * We want to both print the backtrace and dump the core. To do so, we fork a
+ * process that prints its stack trace, that should be the same as the parents.
+ * Then we wait for 1 sec and we abort. The reason we don't abort immediately
+ * is because it may interrupt the printing of the backtrace.
+ */
+void segv_handler(int signal)
+{
+ if (fork() == 0) {
+ xseg_printtrace();
+ _exit(1);
+ }
+
+ sleep(1);
+ abort();
+}
+
void renew_logfile(int signal)
{
// XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
{
int r;
struct sigaction sa;
+ struct rlimit rlim;
#ifdef MT
global_peer = peer;
#endif
if (r < 0)
return r;
+ /*
+ * Get the current limits for core files and raise them to the largest
+ * value possible
+ */
+ if (getrlimit(RLIMIT_CORE, &rlim) < 0)
+ return r;
+
+ rlim.rlim_cur = rlim.rlim_max;
+
+ if (setrlimit(RLIMIT_CORE, &rlim) < 0)
+ return r;
+
+ /* Install handler for segfaults */
+ sa.sa_handler = segv_handler;
+ r = sigaction(SIGSEGV, &sa, NULL);
+ if (r < 0)
+ return r;
+
sa.sa_handler = renew_logfile;
r = sigaction(SIGUSR1, &sa, NULL);
if (r < 0)
loops = threshold;
}
XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
- xseg_wait_signal(xseg, 10000000UL);
+ xseg_wait_signal(xseg, peer->sd, 10000000UL);
xseg_cancel_wait(xseg, peer->portno_start);
XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
}
{
struct thread *t = (struct thread *) arg;
struct peerd *peer = t->peer;
+ pthread_t thread = pthread_self();
+ long int thread_num = t - t->peer->thread;
char *thread_id;
int i;
+ cpu_set_t mask;
+ int r;
+
+ if (cpu_list.len) {
+ CPU_ZERO(&mask);
+ CPU_SET(cpu_list.cpus[thread_num], &mask);
+ r = pthread_setaffinity_np(thread, sizeof(mask), &mask);
+ if (r < 0) {
+ perror("sched_setaffinity");
+ return NULL;
+ }
+ }
+
+
/*
* We need an identifier for every thread that will spin in peerd_loop.
* The following code is a way to create a string of this format:
* chars should be more than enough.
*/
thread_id = malloc(13 * sizeof(char));
- snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+ snprintf(thread_id, 13, "Thread %ld", thread_num);
for (i = 0; thread_id[i]; i++) {}
t->arg = (void *)realloc(thread_id, i-1);
pthread_setspecific(threadkey, t);
xport portno_start = peer->portno_start;
xport portno_end = peer->portno_end;
pid_t pid = syscall(SYS_gettid);
- uint64_t threshold=1000/(1 + portno_end - portno_start);
+ uint64_t threshold = peer->threshold;
+ threshold /= (1 + portno_end - portno_start);
+ threshold += 1;
uint64_t loops;
XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
}
#endif
XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
- xseg_wait_signal(xseg, 10000000UL);
+ xseg_wait_signal(xseg, peer->sd, 10000000UL);
xseg_cancel_wait(xseg, peer->portno_start);
XSEGLOG2(&lc, I, "%s woke up\n", id);
}
static int init_peerd_loop(struct peerd *peer)
{
struct xseg *xseg = peer->xseg;
+ cpu_set_t mask;
+ int r;
+
+ if (cpu_list.len) {
+ CPU_ZERO(&mask);
+ CPU_SET(cpu_list.cpus[0], &mask);
+
+ r = sched_setaffinity(0, sizeof(mask), &mask);
+ if (r < 0) {
+ perror("sched_setaffinity");
+ return -1;
+ }
+ }
peer->peerd_loop(peer);
custom_peer_finalize(peer);
}
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
- long portno_end, uint32_t nr_threads, xport defer_portno)
+ long portno_end, uint32_t nr_threads, xport defer_portno,
+ uint64_t threshold)
{
int i;
struct peerd *peer;
}
peer->nr_ops = nr_ops;
peer->defer_portno = defer_portno;
+ peer->threshold = threshold;
#ifdef MT
peer->nr_threads = nr_threads;
peer->thread = calloc(nr_threads, sizeof(struct thread));
#else
if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
goto malloc_fail;
+ if (peer->free_reqs.size < peer->nr_ops)
+ peer->nr_ops = peer->free_reqs.size;
#endif
peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
if (!peer->peer_reqs){
* The first port we bind will have its signal_desc initialized by xseg
* and the same signal_desc will be used for all the other ports.
*/
+ peer->sd = NULL;
for (p = peer->portno_start; p <= peer->portno_end; p++) {
- port = xseg_bind_port(peer->xseg, p, sd);
+ port = xseg_bind_port(peer->xseg, p, peer->sd);
if (!port){
printf("cannot bind to port %u\n", (unsigned int) p);
return NULL;
}
if (p == peer->portno_start)
- sd = xseg_get_signal_desc(peer->xseg, port);
+ peer->sd = xseg_get_signal_desc(peer->xseg, port);
}
printf("Peer on ports %u-%u\n", peer->portno_start, peer->portno_end);
return fd;
}
+int get_cpu_list(char *cpus, struct cpu_list *cpu_list)
+{
+ char *tok, *rem;
+ int i = 0;
+
+ while ((tok = strtok(cpus, ",")) != NULL) {
+ cpu_list->cpus[i++] = strtol(tok, &rem, 10);
+ if (strlen(rem) > 0) /* Not a number */
+ return -1;
+ cpus = NULL;
+ }
+
+ cpu_list->len = i;
+
+ return 0;
+}
+
void usage(char *argv0)
{
fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
#ifdef MT
" -t | No | Number of threads \n"
#endif
+ " --cpus | No | Coma-separated list of CPUs\n"
+ " | | to pin the process or threads\n"
"\n"
);
custom_peer_usage();
int daemonize = 0, help = 0;
uint32_t nr_ops = 16;
uint32_t nr_threads = 1;
+ uint64_t threshold = 1000;
unsigned int debug_level = 0;
xport defer_portno = NoPort;
pid_t old_pid;
char spec[MAX_SPEC_LEN + 1];
char logfile[MAX_LOGFILE_LEN + 1];
char pidfile[MAX_PIDFILE_LEN + 1];
+ char cpus[MAX_CPUS_LEN + 1];
logfile[0] = 0;
pidfile[0] = 0;
spec[0] = 0;
+ cpus[0] = 0;
//capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
// -dp xseg_portno to defer blocking requests
READ_ARG_BOOL("-d", daemonize);
READ_ARG_BOOL("-h", help);
READ_ARG_BOOL("--help", help);
+ READ_ARG_ULONG("--threshold", threshold);
+ READ_ARG_STRING("--cpus", cpus, MAX_CPUS_LEN);
READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
END_READ_ARGS();
pidfile_write(pid_fd);
+ if (cpus[0]) {
+ r = get_cpu_list(cpus, &cpu_list);
+
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "--cpus %s: Invalid input", cpus);
+ goto out;
+ }
+#ifdef MT
+ if (nr_threads != cpu_list.len) {
+ XSEGLOG2(&lc, E, "--cpus %s: Number of "
+ "threads (%d) and CPUs don't match",
+ cpus, nr_threads);
+ goto out;
+ }
+#else
+ if (cpu_list.len != 1) {
+ XSEGLOG2(&lc, E, "--cpus %s: Too many CPUs for a "
+ "single process", cpus);
+ goto out;
+ }
+#endif
+ }
+
+
//TODO perform argument sanity checks
verbose = debug_level;
if (portno != -1) {
goto out;
}
- peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
+ peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno, threshold);
if (!peer){
r = -1;
goto out;