6 #include <sys/syscall.h>
17 #include <xseg/xseg.h>
21 #define PEER_TYPE "pthread"
23 #define PEER_TYPE "posix"
26 //FIXME this should not be defined here probably
27 #define MAX_SPEC_LEN 128
28 #define MAX_PIDFILE_LEN 512
30 volatile unsigned int terminated = 0;
31 unsigned int verbose = 0;
38 struct peerd *global_peer;
45 void (*func)(void *arg);
50 inline static struct thread* alloc_thread(struct peerd *peer)
52 xqindex idx = xq_pop_head(&peer->threads, 1);
55 return peer->thread + idx;
58 inline static void free_thread(struct peerd *peer, struct thread *t)
60 xqindex idx = t - peer->thread;
61 xq_append_head(&peer->threads, idx, 1);
65 inline static void __wake_up_thread(struct thread *t)
67 pthread_mutex_lock(&t->lock);
68 pthread_cond_signal(&t->cond);
69 pthread_mutex_unlock(&t->lock);
72 inline static void wake_up_thread(struct thread* t)
79 inline static int wake_up_next_thread(struct peerd *peer)
81 return (xseg_signal(peer->xseg, peer->portno_start));
86 static inline int isTerminate()
88 /* ta doesn't need to be taken into account, because the main loops
89 * doesn't check the terminated flag if ta is not 0.
93 return (!ta & terminated);
101 void signal_handler(int signal)
103 XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
106 wake_up_next_thread(global_peer);
110 void renew_logfile(int signal)
112 XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
113 renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
116 static int setup_signals(struct peerd *peer)
123 sigemptyset(&sa.sa_mask);
125 sa.sa_handler = signal_handler;
126 r = sigaction(SIGTERM, &sa, NULL);
129 r = sigaction(SIGINT, &sa, NULL);
132 r = sigaction(SIGQUIT, &sa, NULL);
136 sa.sa_handler = renew_logfile;
137 r = sigaction(SIGUSR1, &sa, NULL);
144 inline int canDefer(struct peerd *peer)
146 return !(peer->defer_portno == NoPort);
149 void print_req(struct xseg *xseg, struct xseg_request *req)
151 char target[64], data[64];
152 char *req_target, *req_data;
153 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
154 req_target = xseg_get_target(xseg, req);
155 req_data = xseg_get_data(xseg, req);
158 strncpy(target, req_target, end);
160 strncpy(data, req_data, 63);
162 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
163 "src: %u, st: %u, dst: %u dt: %u\n"
164 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
165 (unsigned long)(req),
166 (unsigned int)req->op,
167 (unsigned long long)req->offset,
168 (unsigned long)req->size,
169 (unsigned long)req->serviced,
170 (unsigned int)req->state,
171 (unsigned int)req->src_portno,
172 (unsigned int)req->src_transit_portno,
173 (unsigned int)req->dst_portno,
174 (unsigned int)req->dst_transit_portno,
175 (unsigned int)req->targetlen, target,
176 (unsigned long long)req->datalen, data);
179 void log_pr(char *msg, struct peer_req *pr)
181 char target[64], data[64];
182 char *req_target, *req_data;
183 struct peerd *peer = pr->peer;
184 struct xseg *xseg = pr->peer->xseg;
185 req_target = xseg_get_target(xseg, pr->req);
186 req_data = xseg_get_data(xseg, pr->req);
187 /* null terminate name in case of req->target is less than 63 characters,
188 * and next character after name (aka first byte of next buffer) is not
191 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
193 strncpy(target, req_target, end);
195 strncpy(data, req_data, 63);
197 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
198 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
200 (unsigned int)(pr - peer->peer_reqs),
201 (unsigned int)pr->req->op,
202 (unsigned long long)pr->req->offset,
203 (unsigned long)pr->req->size,
204 (unsigned long)pr->req->serviced,
205 (unsigned long)pr->retval,
206 (unsigned int)pr->req->state,
207 (unsigned int)pr->req->targetlen, target,
208 (unsigned long long)pr->req->datalen, data);
212 inline struct peer_req *alloc_peer_req(struct peerd *peer)
214 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
217 return peer->peer_reqs + idx;
220 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
222 xqindex idx = pr - peer->peer_reqs;
224 xq_append_head(&peer->free_reqs, idx, 1);
227 struct timeval resp_start, resp_end, resp_accum = {0, 0};
228 uint64_t responds = 0;
229 void get_responds_stats(){
230 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
231 //(unsigned int)(t - peer->thread),
232 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
236 void fail(struct peerd *peer, struct peer_req *pr)
238 struct xseg_request *req = pr->req;
240 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
241 req->state |= XS_FAILED;
242 //xseg_set_req_data(peer->xseg, pr->req, NULL);
243 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
244 xseg_signal(peer->xseg, p);
245 free_peer_req(peer, pr);
247 wake_up_next_thread(peer);
252 void complete(struct peerd *peer, struct peer_req *pr)
254 struct xseg_request *req = pr->req;
256 req->state |= XS_SERVED;
257 //xseg_set_req_data(peer->xseg, pr->req, NULL);
258 //gettimeofday(&resp_start, NULL);
259 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
260 //gettimeofday(&resp_end, NULL);
262 //timersub(&resp_end, &resp_start, &resp_end);
263 //timeradd(&resp_end, &resp_accum, &resp_accum);
264 //printf("xseg_signal: %u\n", p);
265 xseg_signal(peer->xseg, p);
266 free_peer_req(peer, pr);
268 wake_up_next_thread(peer);
272 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
273 struct xseg_request *req)
275 struct xseg_request *xreq = pr->req;
276 //assert xreq == req;
277 XSEGLOG2(&lc, D, "Handle accepted");
279 //xreq->state = XS_ACCEPTED;
281 dispatch(peer, pr, req, dispatch_accept);
284 static void handle_received(struct peerd *peer, struct peer_req *pr,
285 struct xseg_request *req)
287 //struct xseg_request *req = pr->req;
288 //assert req->state != XS_ACCEPTED;
289 XSEGLOG2(&lc, D, "Handle received \n");
290 dispatch(peer, pr, req, dispatch_receive);
293 struct timeval sub_start, sub_end, sub_accum = {0, 0};
294 uint64_t submits = 0;
295 void get_submits_stats(){
296 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
297 //(unsigned int)(t - peer->thread),
298 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
301 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
304 struct xseg_request *req = pr->req;
305 // assert req->portno == peer->portno ?
306 //TODO small function with error checking
307 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
308 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
311 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
312 //gettimeofday(&sub_start, NULL);
313 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
314 //gettimeofday(&sub_end, NULL);
316 //timersub(&sub_end, &sub_start, &sub_end);
317 //timeradd(&sub_end, &sub_accum, &sub_accum);
320 xseg_signal(peer->xseg, ret);
324 static int check_ports(struct peerd *peer)
326 struct xseg *xseg = peer->xseg;
327 xport portno_start = peer->portno_start;
328 xport portno_end = peer->portno_end;
329 struct xseg_request *accepted, *received;
334 for (i = portno_start; i <= portno_end; i++) {
337 if (!isTerminate()) {
338 pr = alloc_peer_req(peer);
340 accepted = xseg_accept(xseg, i, X_NONBLOCK);
344 xseg_cancel_wait(xseg, i);
345 handle_accepted(peer, pr, accepted);
349 free_peer_req(peer, pr);
353 received = xseg_receive(xseg, i, X_NONBLOCK);
355 r = xseg_get_req_data(xseg, received, (void **) &pr);
357 XSEGLOG2(&lc, W, "Received request with no pr data\n");
358 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
360 XSEGLOG2(&lc, W, "Could not respond stale request");
361 xseg_put_request(xseg, received, portno_start);
364 xseg_signal(xseg, p);
367 //maybe perform sanity check for pr
368 xseg_cancel_wait(xseg, i);
369 handle_received(peer, pr, received);
379 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
381 struct thread *t = alloc_thread(peer);
388 // we could hijack a thread
392 static void* thread_loop(void *arg)
394 struct thread *t = (struct thread *) arg;
395 struct peerd *peer = t->peer;
396 struct xseg *xseg = peer->xseg;
397 xport portno_start = peer->portno_start;
398 xport portno_end = peer->portno_end;
399 pid_t pid =syscall(SYS_gettid);
401 uint64_t threshold=1000/(1 + portno_end - portno_start);
403 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
405 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
406 xseg_init_local_signal(xseg, peer->portno_start);
407 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
409 XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
410 xseg_cancel_wait(xseg, peer->portno_start);
417 for(loops= threshold; loops > 0; loops--) {
419 xseg_prepare_wait(xseg, peer->portno_start);
420 if (check_ports(peer))
423 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
424 xseg_wait_signal(xseg, 10000000UL);
425 xseg_cancel_wait(xseg, peer->portno_start);
426 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
428 wake_up_next_thread(peer);
432 int peerd_start_threads(struct peerd *peer)
435 uint32_t nr_threads = peer->nr_threads;
437 for (i = 0; i < nr_threads; i++) {
438 peer->thread[i].peer = peer;
439 pthread_cond_init(&peer->thread[i].cond,NULL);
440 pthread_mutex_init(&peer->thread[i].lock, NULL);
441 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
442 peer->thread[i].func = NULL;
443 peer->thread[i].arg = NULL;
450 void defer_request(struct peerd *peer, struct peer_req *pr)
452 // assert canDefer(peer);
453 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
454 // xseg_signal(peer->xseg, peer->defer_portno);
455 // free_peer_req(peer, pr);
458 static int peerd_loop(struct peerd *peer)
462 if (peer->interactive_func)
463 peer->interactive_func();
464 for (i = 0; i < peer->nr_threads; i++) {
465 pthread_join(peer->thread[i].tid, NULL);
468 struct xseg *xseg = peer->xseg;
469 xport portno_start = peer->portno_start;
470 xport portno_end = peer->portno_end;
471 uint64_t threshold=1000/(1 + portno_end - portno_start);
472 pid_t pid =syscall(SYS_gettid);
475 XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
476 xseg_init_local_signal(xseg, peer->portno_start);
477 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
478 for(loops= threshold; loops > 0; loops--) {
480 xseg_prepare_wait(xseg, peer->portno_start);
481 if (check_ports(peer))
489 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
490 xseg_wait_signal(xseg, 10000000UL);
491 xseg_cancel_wait(xseg, peer->portno_start);
492 XSEGLOG2(&lc, I, "Peer woke up\n");
497 xseg_quit_local_signal(xseg, peer->portno_start);
502 static struct xseg *join(char *spec)
504 struct xseg_config config;
507 (void)xseg_parse_spec(spec, &config);
508 xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
512 (void)xseg_create(&config);
513 return xseg_join(config.type, config.name, PEER_TYPE, NULL);
516 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
517 long portno_end, uint32_t nr_threads, uint32_t defer_portno)
521 struct xseg_port *port;
526 peer = malloc(sizeof(struct peerd));
531 peer->nr_ops = nr_ops;
532 peer->defer_portno = defer_portno;
534 peer->nr_threads = nr_threads;
535 peer->thread = calloc(nr_threads, sizeof(struct thread));
539 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
540 if (!peer->peer_reqs){
546 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
549 if (!xq_alloc_empty(&peer->threads, nr_threads))
552 if (xseg_initialize()){
553 printf("cannot initialize library\n");
556 peer->xseg = join(spec);
560 peer->portno_start = (xport) portno_start;
561 peer->portno_end= (xport) portno_end;
562 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
564 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
569 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
570 struct xseg_port *tmp;
571 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
573 printf("cannot bind to port %u\n", (unsigned int) p);
578 printf("Peer on ports %u-%u\n", peer->portno_start,
581 for (i = 0; i < nr_ops; i++) {
582 peer->peer_reqs[i].peer = peer;
583 peer->peer_reqs[i].req = NULL;
584 peer->peer_reqs[i].retval = 0;
585 peer->peer_reqs[i].priv = NULL;
586 peer->peer_reqs[i].portno = NoPort;
588 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
592 peer->interactive_func = NULL;
597 int pidfile_remove(char *path, int fd)
600 return (unlink(path));
603 int pidfile_write(int pid_fd)
606 snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
609 lseek(pid_fd, 0, SEEK_SET);
610 int ret = write(pid_fd, buf, strlen(buf));
614 int pidfile_read(char *path, pid_t *pid)
616 char buf[16], *endptr;
619 int fd = open(path, O_RDONLY);
622 int ret = read(fd, buf, 15);
628 *pid = strtol(buf, &endptr, 10);
629 if (endptr != &buf[ret]){
637 int pidfile_open(char *path, pid_t *old_pid)
640 int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
643 pidfile_read(path, old_pid);
648 void usage(char *argv0)
650 fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
651 fprintf(stderr, "General peer options:\n"
652 " Option | Default | \n"
653 " --------------------------------------------\n"
654 " -g | None | Segment spec to join\n"
655 " -sp | NoPort | Start portno to bind\n"
656 " -ep | NoPort | End portno to bind\n"
657 " -p | NoPort | Portno to bind\n"
658 " -n | 16 | Number of ops\n"
659 " -v | 0 | Verbosity level\n"
660 " -l | None | Logfile \n"
661 " -d | No | Daemonize \n"
662 " --pidfile | None | Pidfile \n"
664 " -t | No | Number of threads \n"
671 int main(int argc, char *argv[])
673 struct peerd *peer = NULL;
676 long portno_start = -1, portno_end = -1, portno = -1;
679 int daemonize = 0, help = 0;
680 uint32_t nr_ops = 16;
681 uint32_t nr_threads = 1;
682 unsigned int debug_level = 0;
683 uint32_t defer_portno = NoPort;
687 char spec[MAX_SPEC_LEN + 1];
688 char logfile[MAX_LOGFILE_LEN + 1];
689 char pidfile[MAX_PIDFILE_LEN + 1];
695 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
696 // -dp xseg_portno to defer blocking requests
698 //TODO print messages on arg parsing error
699 //TODO string checking
702 for (i = 1; i < argc; i++) {
703 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
709 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
710 portno_start = strtoul(argv[i+1], NULL, 10);
715 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
716 portno_end = strtoul(argv[i+1], NULL, 10);
721 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
722 portno = strtoul(argv[i+1], NULL, 10);
727 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
728 nr_ops = strtoul(argv[i+1], NULL, 10);
732 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
733 debug_level = atoi(argv[i+1]);
737 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
738 nr_threads = strtoul(argv[i+1], NULL, 10);
742 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
743 defer_portno = strtoul(argv[i+1], NULL, 10);
747 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
752 if (!strcmp(argv[i], "-d")) {
756 if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
764 BEGIN_READ_ARGS(argc, argv);
765 READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
766 READ_ARG_ULONG("-sp", portno_start);
767 READ_ARG_ULONG("-ep", portno_end);
768 READ_ARG_ULONG("-p", portno);
769 READ_ARG_ULONG("-n", nr_ops);
770 READ_ARG_ULONG("-v", debug_level);
772 READ_ARG_ULONG("-t", nr_threads);
774 // READ_ARG_ULONG("-dp", defer_portno);
775 READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
776 READ_ARG_BOOL("-d", daemonize);
777 READ_ARG_BOOL("-h", help);
778 READ_ARG_BOOL("--help", help);
779 READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
787 r = init_logctx(&lc, argv[0], debug_level, logfile,
788 REDIRECT_STDOUT|REDIRECT_STDERR);
790 XSEGLOG("Cannot initialize logging to logfile");
793 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
796 pid_fd = pidfile_open(pidfile, &old_pid);
799 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
801 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
808 if (daemon(0, 1) < 0){
809 XSEGLOG2(&lc, E, "Cannot daemonize");
815 pidfile_write(pid_fd);
817 //TODO perform argument sanity checks
818 verbose = debug_level;
820 portno_start = portno;
824 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
830 r = custom_peer_init(peer, argc, argv);
835 peerd_start_threads(peer);
839 st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
840 r = st_thread_join(st, NULL);
842 r = peerd_loop(peer);
846 pidfile_remove(pidfile, pid_fd);