8 #include <sys/syscall.h>
16 #define PEER_TYPE "pthread"
18 #define PEER_TYPE "posix"
21 volatile unsigned int terminated = 0;
22 unsigned int verbose = 0;
34 void (*func)(void *arg);
39 inline static struct thread* alloc_thread(struct peerd *peer)
41 xqindex idx = xq_pop_head(&peer->threads, 1);
44 return peer->thread + idx;
47 inline static void free_thread(struct peerd *peer, struct thread *t)
49 xqindex idx = t - peer->thread;
50 xq_append_head(&peer->threads, idx, 1);
54 inline static void __wake_up_thread(struct thread *t)
56 pthread_mutex_lock(&t->lock);
57 pthread_cond_signal(&t->cond);
58 pthread_mutex_unlock(&t->lock);
61 inline static void wake_up_thread(struct thread* t)
68 inline static int wake_up_next_thread(struct peerd *peer)
70 return (xseg_signal(peer->xseg, peer->portno_start));
75 static inline int isTerminate()
77 /* ta doesn't need to be taken into account, because the main loops
78 * doesn't check the terminated flag if ta is not 0.
81 return (!ta & terminated);
89 void signal_handler(int signal)
91 XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
95 static int setup_signals()
99 sigemptyset(&sa.sa_mask);
101 sa.sa_handler = signal_handler;
102 r = sigaction(SIGTERM, &sa, NULL);
105 r = sigaction(SIGINT, &sa, NULL);
108 r = sigaction(SIGQUIT, &sa, NULL);
114 inline int canDefer(struct peerd *peer)
116 return !(peer->defer_portno == NoPort);
119 void print_req(struct xseg *xseg, struct xseg_request *req)
121 char target[64], data[64];
122 char *req_target, *req_data;
123 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
124 req_target = xseg_get_target(xseg, req);
125 req_data = xseg_get_data(xseg, req);
128 strncpy(target, req_target, end);
130 strncpy(data, req_data, 63);
132 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
133 "src: %u, st: %u, dst: %u dt: %u\n"
134 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
135 (unsigned long)(req),
136 (unsigned int)req->op,
137 (unsigned long long)req->offset,
138 (unsigned long)req->size,
139 (unsigned long)req->serviced,
140 (unsigned int)req->state,
141 (unsigned int)req->src_portno,
142 (unsigned int)req->src_transit_portno,
143 (unsigned int)req->dst_portno,
144 (unsigned int)req->dst_transit_portno,
145 (unsigned int)req->targetlen, target,
146 (unsigned long long)req->datalen, data);
149 void log_pr(char *msg, struct peer_req *pr)
151 char target[64], data[64];
152 char *req_target, *req_data;
153 struct peerd *peer = pr->peer;
154 struct xseg *xseg = pr->peer->xseg;
155 req_target = xseg_get_target(xseg, pr->req);
156 req_data = xseg_get_data(xseg, pr->req);
157 /* null terminate name in case of req->target is less than 63 characters,
158 * and next character after name (aka first byte of next buffer) is not
161 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
163 strncpy(target, req_target, end);
165 strncpy(data, req_data, 63);
167 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
168 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
170 (unsigned int)(pr - peer->peer_reqs),
171 (unsigned int)pr->req->op,
172 (unsigned long long)pr->req->offset,
173 (unsigned long)pr->req->size,
174 (unsigned long)pr->req->serviced,
175 (unsigned long)pr->retval,
176 (unsigned int)pr->req->state,
177 (unsigned int)pr->req->targetlen, target,
178 (unsigned long long)pr->req->datalen, data);
182 inline struct peer_req *alloc_peer_req(struct peerd *peer)
184 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
187 return peer->peer_reqs + idx;
190 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
192 xqindex idx = pr - peer->peer_reqs;
194 xq_append_head(&peer->free_reqs, idx, 1);
197 struct timeval resp_start, resp_end, resp_accum = {0, 0};
198 uint64_t responds = 0;
199 void get_responds_stats(){
200 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
201 //(unsigned int)(t - peer->thread),
202 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
206 void fail(struct peerd *peer, struct peer_req *pr)
208 struct xseg_request *req = pr->req;
210 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
211 req->state |= XS_FAILED;
212 //xseg_set_req_data(peer->xseg, pr->req, NULL);
213 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
214 xseg_signal(peer->xseg, p);
215 free_peer_req(peer, pr);
217 wake_up_next_thread(peer);
222 void complete(struct peerd *peer, struct peer_req *pr)
224 struct xseg_request *req = pr->req;
226 req->state |= XS_SERVED;
227 //xseg_set_req_data(peer->xseg, pr->req, NULL);
228 //gettimeofday(&resp_start, NULL);
229 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
230 //gettimeofday(&resp_end, NULL);
232 //timersub(&resp_end, &resp_start, &resp_end);
233 //timeradd(&resp_end, &resp_accum, &resp_accum);
234 //printf("xseg_signal: %u\n", p);
235 xseg_signal(peer->xseg, p);
236 free_peer_req(peer, pr);
238 wake_up_next_thread(peer);
242 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
243 struct xseg_request *req)
245 struct xseg_request *xreq = pr->req;
246 //assert xreq == req;
247 XSEGLOG2(&lc, D, "Handle accepted");
249 //xreq->state = XS_ACCEPTED;
251 dispatch(peer, pr, req, dispatch_accept);
254 static void handle_received(struct peerd *peer, struct peer_req *pr,
255 struct xseg_request *req)
257 //struct xseg_request *req = pr->req;
258 //assert req->state != XS_ACCEPTED;
259 XSEGLOG2(&lc, D, "Handle received \n");
260 dispatch(peer, pr, req, dispatch_receive);
263 struct timeval sub_start, sub_end, sub_accum = {0, 0};
264 uint64_t submits = 0;
265 void get_submits_stats(){
266 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
267 //(unsigned int)(t - peer->thread),
268 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
271 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
274 struct xseg_request *req = pr->req;
275 // assert req->portno == peer->portno ?
276 //TODO small function with error checking
277 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
278 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
281 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
282 //gettimeofday(&sub_start, NULL);
283 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
284 //gettimeofday(&sub_end, NULL);
286 //timersub(&sub_end, &sub_start, &sub_end);
287 //timeradd(&sub_end, &sub_accum, &sub_accum);
290 xseg_signal(peer->xseg, ret);
294 static int check_ports(struct peerd *peer)
296 struct xseg *xseg = peer->xseg;
297 xport portno_start = peer->portno_start;
298 xport portno_end = peer->portno_end;
299 struct xseg_request *accepted, *received;
304 for (i = portno_start; i <= portno_end; i++) {
307 if (!isTerminate()) {
308 pr = alloc_peer_req(peer);
310 accepted = xseg_accept(xseg, i, X_NONBLOCK);
314 xseg_cancel_wait(xseg, i);
315 handle_accepted(peer, pr, accepted);
319 free_peer_req(peer, pr);
323 received = xseg_receive(xseg, i, X_NONBLOCK);
325 r = xseg_get_req_data(xseg, received, (void **) &pr);
327 XSEGLOG2(&lc, W, "Received request with no pr data\n");
328 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
330 XSEGLOG2(&lc, W, "Could not respond stale request");
331 xseg_put_request(xseg, received, portno_start);
334 xseg_signal(xseg, p);
337 //maybe perform sanity check for pr
338 xseg_cancel_wait(xseg, i);
339 handle_received(peer, pr, received);
349 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
351 struct thread *t = alloc_thread(peer);
358 // we could hijack a thread
362 static void* thread_loop(void *arg)
364 struct thread *t = (struct thread *) arg;
365 struct peerd *peer = t->peer;
366 struct xseg *xseg = peer->xseg;
367 xport portno_start = peer->portno_start;
368 xport portno_end = peer->portno_end;
369 pid_t pid =syscall(SYS_gettid);
371 uint64_t threshold=1000/(1 + portno_end - portno_start);
373 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
375 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
376 xseg_init_local_signal(xseg, peer->portno_start);
377 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
379 XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
380 xseg_cancel_wait(xseg, peer->portno_start);
387 for(loops= threshold; loops > 0; loops--) {
389 xseg_prepare_wait(xseg, peer->portno_start);
390 if (check_ports(peer))
393 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
394 xseg_wait_signal(xseg, 10000000UL);
395 xseg_cancel_wait(xseg, peer->portno_start);
396 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
401 int peerd_start_threads(struct peerd *peer)
404 uint32_t nr_threads = peer->nr_threads;
406 for (i = 0; i < nr_threads; i++) {
407 peer->thread[i].peer = peer;
408 pthread_cond_init(&peer->thread[i].cond,NULL);
409 pthread_mutex_init(&peer->thread[i].lock, NULL);
410 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
411 peer->thread[i].func = NULL;
412 peer->thread[i].arg = NULL;
419 void defer_request(struct peerd *peer, struct peer_req *pr)
421 // assert canDefer(peer);
422 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
423 // xseg_signal(peer->xseg, peer->defer_portno);
424 // free_peer_req(peer, pr);
427 static int peerd_loop(struct peerd *peer)
431 if (peer->interactive_func)
432 peer->interactive_func();
433 for (i = 0; i < peer->nr_threads; i++) {
434 pthread_join(peer->thread[i].tid, NULL);
437 struct xseg *xseg = peer->xseg;
438 xport portno_start = peer->portno_start;
439 xport portno_end = peer->portno_end;
440 uint64_t threshold=1000/(1 + portno_end - portno_start);
441 pid_t pid =syscall(SYS_gettid);
444 XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
445 xseg_init_local_signal(xseg, peer->portno_start);
446 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
447 for(loops= threshold; loops > 0; loops--) {
449 xseg_prepare_wait(xseg, peer->portno_start);
450 if (check_ports(peer))
458 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
459 xseg_wait_signal(xseg, 10000000UL);
460 xseg_cancel_wait(xseg, peer->portno_start);
461 XSEGLOG2(&lc, I, "Peer woke up\n");
466 xseg_quit_local_signal(xseg, peer->portno_start);
471 static struct xseg *join(char *spec)
473 struct xseg_config config;
476 (void)xseg_parse_spec(spec, &config);
477 xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
481 (void)xseg_create(&config);
482 return xseg_join(config.type, config.name, PEER_TYPE, NULL);
485 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
486 long portno_end, uint32_t nr_threads, uint32_t defer_portno)
490 struct xseg_port *port;
495 peer = malloc(sizeof(struct peerd));
500 peer->nr_ops = nr_ops;
501 peer->defer_portno = defer_portno;
503 peer->nr_threads = nr_threads;
504 peer->thread = calloc(nr_threads, sizeof(struct thread));
508 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
509 if (!peer->peer_reqs){
515 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
518 if (!xq_alloc_empty(&peer->threads, nr_threads))
521 if (xseg_initialize()){
522 printf("cannot initialize library\n");
525 peer->xseg = join(spec);
529 peer->portno_start = (xport) portno_start;
530 peer->portno_end= (xport) portno_end;
531 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
533 printf("cannot bind to port %ld\n", peer->portno_start);
538 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
539 struct xseg_port *tmp;
540 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
542 printf("cannot bind to port %ld\n", p);
547 printf("Peer on ports %u-%u\n", peer->portno_start,
550 for (i = 0; i < nr_ops; i++) {
551 peer->peer_reqs[i].peer = peer;
552 peer->peer_reqs[i].req = NULL;
553 peer->peer_reqs[i].retval = 0;
554 peer->peer_reqs[i].priv = NULL;
555 peer->peer_reqs[i].portno = NoPort;
557 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
561 peer->interactive_func = NULL;
567 int main(int argc, char *argv[])
569 struct peerd *peer = NULL;
573 long portno_start = -1, portno_end = -1, portno = -1;
575 uint32_t nr_ops = 16;
576 uint32_t nr_threads = 16 ;
577 unsigned int debug_level = 0;
578 uint32_t defer_portno = NoPort;
579 char *logfile = NULL;
581 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
582 // -dp xseg_portno to defer blocking requests
584 //TODO print messages on arg parsing error
586 for (i = 1; i < argc; i++) {
587 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
593 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
594 portno_start = strtoul(argv[i+1], NULL, 10);
599 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
600 portno_end = strtoul(argv[i+1], NULL, 10);
605 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
606 portno = strtoul(argv[i+1], NULL, 10);
611 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
612 nr_ops = strtoul(argv[i+1], NULL, 10);
616 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
617 debug_level = atoi(argv[i+1]);
621 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
622 nr_threads = strtoul(argv[i+1], NULL, 10);
626 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
627 defer_portno = strtoul(argv[i+1], NULL, 10);
631 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
638 init_logctx(&lc, argv[0], debug_level, logfile);
639 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
641 //TODO perform argument sanity checks
642 verbose = debug_level;
644 portno_start = portno;
650 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
653 r = custom_peer_init(peer, argc, argv);
657 peerd_start_threads(peer);
661 st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
662 return st_thread_join(st, NULL);
664 return peerd_loop(peer);