8 #include <sys/syscall.h>
19 #define PEER_TYPE "pthread"
21 #define PEER_TYPE "posix"
24 volatile unsigned int terminated = 0;
25 unsigned int verbose = 0;
32 struct peerd *global_peer;
39 void (*func)(void *arg);
44 inline static struct thread* alloc_thread(struct peerd *peer)
46 xqindex idx = xq_pop_head(&peer->threads, 1);
49 return peer->thread + idx;
52 inline static void free_thread(struct peerd *peer, struct thread *t)
54 xqindex idx = t - peer->thread;
55 xq_append_head(&peer->threads, idx, 1);
59 inline static void __wake_up_thread(struct thread *t)
61 pthread_mutex_lock(&t->lock);
62 pthread_cond_signal(&t->cond);
63 pthread_mutex_unlock(&t->lock);
66 inline static void wake_up_thread(struct thread* t)
73 inline static int wake_up_next_thread(struct peerd *peer)
75 return (xseg_signal(peer->xseg, peer->portno_start));
80 static inline int isTerminate()
82 /* ta doesn't need to be taken into account, because the main loops
83 * doesn't check the terminated flag if ta is not 0.
87 return (!ta & terminated);
95 void signal_handler(int signal)
97 XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
100 wake_up_next_thread(global_peer);
104 void renew_logfile(int signal)
106 XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
107 renew_logctx(&lc, NULL, lc.log_level, NULL, REOPEN_FILE);
110 static int setup_signals(struct peerd *peer)
117 sigemptyset(&sa.sa_mask);
119 sa.sa_handler = signal_handler;
120 r = sigaction(SIGTERM, &sa, NULL);
123 r = sigaction(SIGINT, &sa, NULL);
126 r = sigaction(SIGQUIT, &sa, NULL);
130 sa.sa_handler = renew_logfile;
131 r = sigaction(SIGUSR1, &sa, NULL);
138 inline int canDefer(struct peerd *peer)
140 return !(peer->defer_portno == NoPort);
143 void print_req(struct xseg *xseg, struct xseg_request *req)
145 char target[64], data[64];
146 char *req_target, *req_data;
147 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
148 req_target = xseg_get_target(xseg, req);
149 req_data = xseg_get_data(xseg, req);
152 strncpy(target, req_target, end);
154 strncpy(data, req_data, 63);
156 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
157 "src: %u, st: %u, dst: %u dt: %u\n"
158 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
159 (unsigned long)(req),
160 (unsigned int)req->op,
161 (unsigned long long)req->offset,
162 (unsigned long)req->size,
163 (unsigned long)req->serviced,
164 (unsigned int)req->state,
165 (unsigned int)req->src_portno,
166 (unsigned int)req->src_transit_portno,
167 (unsigned int)req->dst_portno,
168 (unsigned int)req->dst_transit_portno,
169 (unsigned int)req->targetlen, target,
170 (unsigned long long)req->datalen, data);
173 void log_pr(char *msg, struct peer_req *pr)
175 char target[64], data[64];
176 char *req_target, *req_data;
177 struct peerd *peer = pr->peer;
178 struct xseg *xseg = pr->peer->xseg;
179 req_target = xseg_get_target(xseg, pr->req);
180 req_data = xseg_get_data(xseg, pr->req);
181 /* null terminate name in case of req->target is less than 63 characters,
182 * and next character after name (aka first byte of next buffer) is not
185 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
187 strncpy(target, req_target, end);
189 strncpy(data, req_data, 63);
191 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
192 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
194 (unsigned int)(pr - peer->peer_reqs),
195 (unsigned int)pr->req->op,
196 (unsigned long long)pr->req->offset,
197 (unsigned long)pr->req->size,
198 (unsigned long)pr->req->serviced,
199 (unsigned long)pr->retval,
200 (unsigned int)pr->req->state,
201 (unsigned int)pr->req->targetlen, target,
202 (unsigned long long)pr->req->datalen, data);
206 inline struct peer_req *alloc_peer_req(struct peerd *peer)
208 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
211 return peer->peer_reqs + idx;
214 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
216 xqindex idx = pr - peer->peer_reqs;
218 xq_append_head(&peer->free_reqs, idx, 1);
221 struct timeval resp_start, resp_end, resp_accum = {0, 0};
222 uint64_t responds = 0;
223 void get_responds_stats(){
224 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
225 //(unsigned int)(t - peer->thread),
226 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
230 void fail(struct peerd *peer, struct peer_req *pr)
232 struct xseg_request *req = pr->req;
234 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
235 req->state |= XS_FAILED;
236 //xseg_set_req_data(peer->xseg, pr->req, NULL);
237 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
238 xseg_signal(peer->xseg, p);
239 free_peer_req(peer, pr);
241 wake_up_next_thread(peer);
246 void complete(struct peerd *peer, struct peer_req *pr)
248 struct xseg_request *req = pr->req;
250 req->state |= XS_SERVED;
251 //xseg_set_req_data(peer->xseg, pr->req, NULL);
252 //gettimeofday(&resp_start, NULL);
253 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
254 //gettimeofday(&resp_end, NULL);
256 //timersub(&resp_end, &resp_start, &resp_end);
257 //timeradd(&resp_end, &resp_accum, &resp_accum);
258 //printf("xseg_signal: %u\n", p);
259 xseg_signal(peer->xseg, p);
260 free_peer_req(peer, pr);
262 wake_up_next_thread(peer);
266 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
267 struct xseg_request *req)
269 struct xseg_request *xreq = pr->req;
270 //assert xreq == req;
271 XSEGLOG2(&lc, D, "Handle accepted");
273 //xreq->state = XS_ACCEPTED;
275 dispatch(peer, pr, req, dispatch_accept);
278 static void handle_received(struct peerd *peer, struct peer_req *pr,
279 struct xseg_request *req)
281 //struct xseg_request *req = pr->req;
282 //assert req->state != XS_ACCEPTED;
283 XSEGLOG2(&lc, D, "Handle received \n");
284 dispatch(peer, pr, req, dispatch_receive);
287 struct timeval sub_start, sub_end, sub_accum = {0, 0};
288 uint64_t submits = 0;
289 void get_submits_stats(){
290 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
291 //(unsigned int)(t - peer->thread),
292 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
295 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
298 struct xseg_request *req = pr->req;
299 // assert req->portno == peer->portno ?
300 //TODO small function with error checking
301 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
302 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
305 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
306 //gettimeofday(&sub_start, NULL);
307 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
308 //gettimeofday(&sub_end, NULL);
310 //timersub(&sub_end, &sub_start, &sub_end);
311 //timeradd(&sub_end, &sub_accum, &sub_accum);
314 xseg_signal(peer->xseg, ret);
318 static int check_ports(struct peerd *peer)
320 struct xseg *xseg = peer->xseg;
321 xport portno_start = peer->portno_start;
322 xport portno_end = peer->portno_end;
323 struct xseg_request *accepted, *received;
328 for (i = portno_start; i <= portno_end; i++) {
331 if (!isTerminate()) {
332 pr = alloc_peer_req(peer);
334 accepted = xseg_accept(xseg, i, X_NONBLOCK);
338 xseg_cancel_wait(xseg, i);
339 handle_accepted(peer, pr, accepted);
343 free_peer_req(peer, pr);
347 received = xseg_receive(xseg, i, X_NONBLOCK);
349 r = xseg_get_req_data(xseg, received, (void **) &pr);
351 XSEGLOG2(&lc, W, "Received request with no pr data\n");
352 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
354 XSEGLOG2(&lc, W, "Could not respond stale request");
355 xseg_put_request(xseg, received, portno_start);
358 xseg_signal(xseg, p);
361 //maybe perform sanity check for pr
362 xseg_cancel_wait(xseg, i);
363 handle_received(peer, pr, received);
373 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
375 struct thread *t = alloc_thread(peer);
382 // we could hijack a thread
386 static void* thread_loop(void *arg)
388 struct thread *t = (struct thread *) arg;
389 struct peerd *peer = t->peer;
390 struct xseg *xseg = peer->xseg;
391 xport portno_start = peer->portno_start;
392 xport portno_end = peer->portno_end;
393 pid_t pid =syscall(SYS_gettid);
395 uint64_t threshold=1000/(1 + portno_end - portno_start);
397 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
399 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
400 xseg_init_local_signal(xseg, peer->portno_start);
401 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
403 XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
404 xseg_cancel_wait(xseg, peer->portno_start);
411 for(loops= threshold; loops > 0; loops--) {
413 xseg_prepare_wait(xseg, peer->portno_start);
414 if (check_ports(peer))
417 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
418 xseg_wait_signal(xseg, 10000000UL);
419 xseg_cancel_wait(xseg, peer->portno_start);
420 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
422 wake_up_next_thread(peer);
426 int peerd_start_threads(struct peerd *peer)
429 uint32_t nr_threads = peer->nr_threads;
431 for (i = 0; i < nr_threads; i++) {
432 peer->thread[i].peer = peer;
433 pthread_cond_init(&peer->thread[i].cond,NULL);
434 pthread_mutex_init(&peer->thread[i].lock, NULL);
435 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
436 peer->thread[i].func = NULL;
437 peer->thread[i].arg = NULL;
444 void defer_request(struct peerd *peer, struct peer_req *pr)
446 // assert canDefer(peer);
447 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
448 // xseg_signal(peer->xseg, peer->defer_portno);
449 // free_peer_req(peer, pr);
452 static int peerd_loop(struct peerd *peer)
456 if (peer->interactive_func)
457 peer->interactive_func();
458 for (i = 0; i < peer->nr_threads; i++) {
459 pthread_join(peer->thread[i].tid, NULL);
462 struct xseg *xseg = peer->xseg;
463 xport portno_start = peer->portno_start;
464 xport portno_end = peer->portno_end;
465 uint64_t threshold=1000/(1 + portno_end - portno_start);
466 pid_t pid =syscall(SYS_gettid);
469 XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
470 xseg_init_local_signal(xseg, peer->portno_start);
471 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
472 for(loops= threshold; loops > 0; loops--) {
474 xseg_prepare_wait(xseg, peer->portno_start);
475 if (check_ports(peer))
483 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
484 xseg_wait_signal(xseg, 10000000UL);
485 xseg_cancel_wait(xseg, peer->portno_start);
486 XSEGLOG2(&lc, I, "Peer woke up\n");
491 xseg_quit_local_signal(xseg, peer->portno_start);
496 static struct xseg *join(char *spec)
498 struct xseg_config config;
501 (void)xseg_parse_spec(spec, &config);
502 xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
506 (void)xseg_create(&config);
507 return xseg_join(config.type, config.name, PEER_TYPE, NULL);
510 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
511 long portno_end, uint32_t nr_threads, uint32_t defer_portno)
515 struct xseg_port *port;
520 peer = malloc(sizeof(struct peerd));
525 peer->nr_ops = nr_ops;
526 peer->defer_portno = defer_portno;
528 peer->nr_threads = nr_threads;
529 peer->thread = calloc(nr_threads, sizeof(struct thread));
533 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
534 if (!peer->peer_reqs){
540 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
543 if (!xq_alloc_empty(&peer->threads, nr_threads))
546 if (xseg_initialize()){
547 printf("cannot initialize library\n");
550 peer->xseg = join(spec);
554 peer->portno_start = (xport) portno_start;
555 peer->portno_end= (xport) portno_end;
556 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
558 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
563 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
564 struct xseg_port *tmp;
565 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
567 printf("cannot bind to port %u\n", (unsigned int) p);
572 printf("Peer on ports %u-%u\n", peer->portno_start,
575 for (i = 0; i < nr_ops; i++) {
576 peer->peer_reqs[i].peer = peer;
577 peer->peer_reqs[i].req = NULL;
578 peer->peer_reqs[i].retval = 0;
579 peer->peer_reqs[i].priv = NULL;
580 peer->peer_reqs[i].portno = NoPort;
582 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
586 peer->interactive_func = NULL;
591 int pidfile_remove(char *path, int fd)
594 return (unlink(path));
597 int pidfile_write(int pid_fd)
600 snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
603 lseek(pid_fd, 0, SEEK_SET);
604 int ret = write(pid_fd, buf, strlen(buf));
608 int pidfile_read(char *path, pid_t *pid)
610 char buf[16], *endptr;
613 int fd = open(path, O_RDONLY);
616 int ret = read(fd, buf, 15);
622 *pid = strtol(buf, &endptr, 10);
623 if (endptr != &buf[ret]){
631 int pidfile_open(char *path, pid_t *old_pid)
634 int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
636 if (errno == -EEXIST)
637 pidfile_read(path, old_pid);
642 int main(int argc, char *argv[])
644 struct peerd *peer = NULL;
647 int i, r, daemonize = 0;
648 long portno_start = -1, portno_end = -1, portno = -1;
650 uint32_t nr_ops = 16;
651 uint32_t nr_threads = 16 ;
652 unsigned int debug_level = 0;
653 uint32_t defer_portno = NoPort;
654 char *logfile = NULL;
655 char *pidfile = NULL;
659 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
660 // -dp xseg_portno to defer blocking requests
662 //TODO print messages on arg parsing error
663 //TODO string checking
665 for (i = 1; i < argc; i++) {
666 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
672 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
673 portno_start = strtoul(argv[i+1], NULL, 10);
678 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
679 portno_end = strtoul(argv[i+1], NULL, 10);
684 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
685 portno = strtoul(argv[i+1], NULL, 10);
690 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
691 nr_ops = strtoul(argv[i+1], NULL, 10);
695 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
696 debug_level = atoi(argv[i+1]);
700 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
701 nr_threads = strtoul(argv[i+1], NULL, 10);
705 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
706 defer_portno = strtoul(argv[i+1], NULL, 10);
710 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
715 if (!strcmp(argv[i], "-d")) {
719 if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
726 r = init_logctx(&lc, argv[0], debug_level, logfile,
727 REDIRECT_STDOUT|REDIRECT_STDERR);
729 XSEGLOG("Cannot initialize logging to logfile");
732 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
735 pid_fd = pidfile_open(pidfile, &old_pid);
738 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
740 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
747 if (daemon(0, 1) < 0){
748 XSEGLOG2(&lc, E, "Cannot daemonize");
754 pidfile_write(pid_fd);
756 //TODO perform argument sanity checks
757 verbose = debug_level;
759 portno_start = portno;
764 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
770 r = custom_peer_init(peer, argc, argv);
774 peerd_start_threads(peer);
778 st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
779 r = st_thread_join(st, NULL);
781 r = peerd_loop(peer);
785 pidfile_remove(pidfile, pid_fd);