2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <sys/types.h>
40 #include <sys/syscall.h>
51 #include <xseg/xseg.h>
55 #define PEER_TYPE "pthread"
57 #define PEER_TYPE "posix"
60 //FIXME this should not be defined here probably
61 #define MAX_SPEC_LEN 128
62 #define MAX_PIDFILE_LEN 512
64 volatile unsigned int terminated = 0;
65 unsigned int verbose = 0;
72 struct peerd *global_peer;
79 void (*func)(void *arg);
84 inline static struct thread* alloc_thread(struct peerd *peer)
86 xqindex idx = xq_pop_head(&peer->threads, 1);
89 return peer->thread + idx;
92 inline static void free_thread(struct peerd *peer, struct thread *t)
94 xqindex idx = t - peer->thread;
95 xq_append_head(&peer->threads, idx, 1);
99 inline static void __wake_up_thread(struct thread *t)
101 pthread_mutex_lock(&t->lock);
102 pthread_cond_signal(&t->cond);
103 pthread_mutex_unlock(&t->lock);
106 inline static void wake_up_thread(struct thread* t)
113 inline static int wake_up_next_thread(struct peerd *peer)
115 return (xseg_signal(peer->xseg, peer->portno_start));
120 static inline int isTerminate()
122 /* ta doesn't need to be taken into account, because the main loops
123 * doesn't check the terminated flag if ta is not 0.
127 return (!ta & terminated);
135 void signal_handler(int signal)
137 XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
140 wake_up_next_thread(global_peer);
144 void renew_logfile(int signal)
146 XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
147 renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
150 static int setup_signals(struct peerd *peer)
157 sigemptyset(&sa.sa_mask);
159 sa.sa_handler = signal_handler;
160 r = sigaction(SIGTERM, &sa, NULL);
163 r = sigaction(SIGINT, &sa, NULL);
166 r = sigaction(SIGQUIT, &sa, NULL);
170 sa.sa_handler = renew_logfile;
171 r = sigaction(SIGUSR1, &sa, NULL);
178 inline int canDefer(struct peerd *peer)
180 return !(peer->defer_portno == NoPort);
183 void print_req(struct xseg *xseg, struct xseg_request *req)
185 char target[64], data[64];
186 char *req_target, *req_data;
187 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
188 req_target = xseg_get_target(xseg, req);
189 req_data = xseg_get_data(xseg, req);
192 strncpy(target, req_target, end);
194 strncpy(data, req_data, 63);
196 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
197 "src: %u, st: %u, dst: %u dt: %u\n"
198 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
199 (unsigned long)(req),
200 (unsigned int)req->op,
201 (unsigned long long)req->offset,
202 (unsigned long)req->size,
203 (unsigned long)req->serviced,
204 (unsigned int)req->state,
205 (unsigned int)req->src_portno,
206 (unsigned int)req->src_transit_portno,
207 (unsigned int)req->dst_portno,
208 (unsigned int)req->dst_transit_portno,
209 (unsigned int)req->targetlen, target,
210 (unsigned long long)req->datalen, data);
213 void log_pr(char *msg, struct peer_req *pr)
215 char target[64], data[64];
216 char *req_target, *req_data;
217 struct peerd *peer = pr->peer;
218 struct xseg *xseg = pr->peer->xseg;
219 req_target = xseg_get_target(xseg, pr->req);
220 req_data = xseg_get_data(xseg, pr->req);
221 /* null terminate name in case of req->target is less than 63 characters,
222 * and next character after name (aka first byte of next buffer) is not
225 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
227 strncpy(target, req_target, end);
229 strncpy(data, req_data, 63);
231 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
232 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
234 (unsigned int)(pr - peer->peer_reqs),
235 (unsigned int)pr->req->op,
236 (unsigned long long)pr->req->offset,
237 (unsigned long)pr->req->size,
238 (unsigned long)pr->req->serviced,
239 (unsigned long)pr->retval,
240 (unsigned int)pr->req->state,
241 (unsigned int)pr->req->targetlen, target,
242 (unsigned long long)pr->req->datalen, data);
246 inline struct peer_req *alloc_peer_req(struct peerd *peer)
248 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
251 return peer->peer_reqs + idx;
254 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
256 xqindex idx = pr - peer->peer_reqs;
258 xq_append_head(&peer->free_reqs, idx, 1);
261 struct timeval resp_start, resp_end, resp_accum = {0, 0};
262 uint64_t responds = 0;
263 void get_responds_stats(){
264 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
265 //(unsigned int)(t - peer->thread),
266 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
270 void fail(struct peerd *peer, struct peer_req *pr)
272 struct xseg_request *req = pr->req;
274 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
275 req->state |= XS_FAILED;
276 //xseg_set_req_data(peer->xseg, pr->req, NULL);
277 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
278 xseg_signal(peer->xseg, p);
279 free_peer_req(peer, pr);
281 wake_up_next_thread(peer);
286 void complete(struct peerd *peer, struct peer_req *pr)
288 struct xseg_request *req = pr->req;
290 req->state |= XS_SERVED;
291 //xseg_set_req_data(peer->xseg, pr->req, NULL);
292 //gettimeofday(&resp_start, NULL);
293 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
294 //gettimeofday(&resp_end, NULL);
296 //timersub(&resp_end, &resp_start, &resp_end);
297 //timeradd(&resp_end, &resp_accum, &resp_accum);
298 //printf("xseg_signal: %u\n", p);
299 xseg_signal(peer->xseg, p);
300 free_peer_req(peer, pr);
302 wake_up_next_thread(peer);
306 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
307 struct xseg_request *req)
309 struct xseg_request *xreq = pr->req;
310 //assert xreq == req;
311 XSEGLOG2(&lc, D, "Handle accepted");
313 //xreq->state = XS_ACCEPTED;
315 dispatch(peer, pr, req, dispatch_accept);
318 static void handle_received(struct peerd *peer, struct peer_req *pr,
319 struct xseg_request *req)
321 //struct xseg_request *req = pr->req;
322 //assert req->state != XS_ACCEPTED;
323 XSEGLOG2(&lc, D, "Handle received \n");
324 dispatch(peer, pr, req, dispatch_receive);
327 struct timeval sub_start, sub_end, sub_accum = {0, 0};
328 uint64_t submits = 0;
329 void get_submits_stats(){
330 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
331 //(unsigned int)(t - peer->thread),
332 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
335 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
338 struct xseg_request *req = pr->req;
339 // assert req->portno == peer->portno ?
340 //TODO small function with error checking
341 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
342 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
345 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
346 //gettimeofday(&sub_start, NULL);
347 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
348 //gettimeofday(&sub_end, NULL);
350 //timersub(&sub_end, &sub_start, &sub_end);
351 //timeradd(&sub_end, &sub_accum, &sub_accum);
354 xseg_signal(peer->xseg, ret);
358 static int check_ports(struct peerd *peer)
360 struct xseg *xseg = peer->xseg;
361 xport portno_start = peer->portno_start;
362 xport portno_end = peer->portno_end;
363 struct xseg_request *accepted, *received;
368 for (i = portno_start; i <= portno_end; i++) {
371 if (!isTerminate()) {
372 pr = alloc_peer_req(peer);
374 accepted = xseg_accept(xseg, i, X_NONBLOCK);
378 xseg_cancel_wait(xseg, i);
379 handle_accepted(peer, pr, accepted);
383 free_peer_req(peer, pr);
387 received = xseg_receive(xseg, i, X_NONBLOCK);
389 r = xseg_get_req_data(xseg, received, (void **) &pr);
391 XSEGLOG2(&lc, W, "Received request with no pr data\n");
392 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
394 XSEGLOG2(&lc, W, "Could not respond stale request");
395 xseg_put_request(xseg, received, portno_start);
398 xseg_signal(xseg, p);
401 //maybe perform sanity check for pr
402 xseg_cancel_wait(xseg, i);
403 handle_received(peer, pr, received);
413 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
415 struct thread *t = alloc_thread(peer);
422 // we could hijack a thread
426 static void* thread_loop(void *arg)
428 struct thread *t = (struct thread *) arg;
429 struct peerd *peer = t->peer;
430 struct xseg *xseg = peer->xseg;
431 xport portno_start = peer->portno_start;
432 xport portno_end = peer->portno_end;
433 pid_t pid =syscall(SYS_gettid);
435 uint64_t threshold=1000/(1 + portno_end - portno_start);
437 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
439 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
440 xseg_init_local_signal(xseg, peer->portno_start);
441 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
443 XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
444 xseg_cancel_wait(xseg, peer->portno_start);
451 for(loops= threshold; loops > 0; loops--) {
453 xseg_prepare_wait(xseg, peer->portno_start);
454 if (check_ports(peer))
457 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
458 xseg_wait_signal(xseg, 10000000UL);
459 xseg_cancel_wait(xseg, peer->portno_start);
460 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
462 wake_up_next_thread(peer);
463 custom_peer_finalize(peer);
467 int peerd_start_threads(struct peerd *peer)
470 uint32_t nr_threads = peer->nr_threads;
472 for (i = 0; i < nr_threads; i++) {
473 peer->thread[i].peer = peer;
474 pthread_cond_init(&peer->thread[i].cond,NULL);
475 pthread_mutex_init(&peer->thread[i].lock, NULL);
476 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
477 peer->thread[i].func = NULL;
478 peer->thread[i].arg = NULL;
486 void defer_request(struct peerd *peer, struct peer_req *pr)
488 // assert canDefer(peer);
489 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
490 // xseg_signal(peer->xseg, peer->defer_portno);
491 // free_peer_req(peer, pr);
494 static int peerd_loop(struct peerd *peer)
498 if (peer->interactive_func)
499 peer->interactive_func();
500 for (i = 0; i < peer->nr_threads; i++) {
501 pthread_join(peer->thread[i].tid, NULL);
504 struct xseg *xseg = peer->xseg;
505 xport portno_start = peer->portno_start;
506 xport portno_end = peer->portno_end;
507 uint64_t threshold=1000/(1 + portno_end - portno_start);
508 pid_t pid =syscall(SYS_gettid);
511 XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
512 xseg_init_local_signal(xseg, peer->portno_start);
513 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
514 for(loops= threshold; loops > 0; loops--) {
516 xseg_prepare_wait(xseg, peer->portno_start);
517 if (check_ports(peer))
525 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
526 xseg_wait_signal(xseg, 10000000UL);
527 xseg_cancel_wait(xseg, peer->portno_start);
528 XSEGLOG2(&lc, I, "Peer woke up\n");
533 custom_peer_finalize(peer);
534 xseg_quit_local_signal(xseg, peer->portno_start);
539 static struct xseg *join(char *spec)
541 struct xseg_config config;
544 (void)xseg_parse_spec(spec, &config);
545 xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
549 (void)xseg_create(&config);
550 return xseg_join(config.type, config.name, PEER_TYPE, NULL);
553 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
554 long portno_end, uint32_t nr_threads, uint32_t defer_portno)
558 struct xseg_port *port;
563 peer = malloc(sizeof(struct peerd));
568 peer->nr_ops = nr_ops;
569 peer->defer_portno = defer_portno;
571 peer->nr_threads = nr_threads;
572 peer->thread = calloc(nr_threads, sizeof(struct thread));
576 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
577 if (!peer->peer_reqs){
583 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
586 if (!xq_alloc_empty(&peer->threads, nr_threads))
589 if (xseg_initialize()){
590 printf("cannot initialize library\n");
593 peer->xseg = join(spec);
597 peer->portno_start = (xport) portno_start;
598 peer->portno_end= (xport) portno_end;
599 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
601 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
606 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
607 struct xseg_port *tmp;
608 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
610 printf("cannot bind to port %u\n", (unsigned int) p);
615 printf("Peer on ports %u-%u\n", peer->portno_start,
618 for (i = 0; i < nr_ops; i++) {
619 peer->peer_reqs[i].peer = peer;
620 peer->peer_reqs[i].req = NULL;
621 peer->peer_reqs[i].retval = 0;
622 peer->peer_reqs[i].priv = NULL;
623 peer->peer_reqs[i].portno = NoPort;
625 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
629 peer->interactive_func = NULL;
634 int pidfile_remove(char *path, int fd)
637 return (unlink(path));
640 int pidfile_write(int pid_fd)
643 snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
646 lseek(pid_fd, 0, SEEK_SET);
647 int ret = write(pid_fd, buf, strlen(buf));
651 int pidfile_read(char *path, pid_t *pid)
653 char buf[16], *endptr;
656 int fd = open(path, O_RDONLY);
659 int ret = read(fd, buf, 15);
665 *pid = strtol(buf, &endptr, 10);
666 if (endptr != &buf[ret]){
674 int pidfile_open(char *path, pid_t *old_pid)
677 int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
680 pidfile_read(path, old_pid);
685 void usage(char *argv0)
687 fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
688 fprintf(stderr, "General peer options:\n"
689 " Option | Default | \n"
690 " --------------------------------------------\n"
691 " -g | None | Segment spec to join\n"
692 " -sp | NoPort | Start portno to bind\n"
693 " -ep | NoPort | End portno to bind\n"
694 " -p | NoPort | Portno to bind\n"
695 " -n | 16 | Number of ops\n"
696 " -v | 0 | Verbosity level\n"
697 " -l | None | Logfile \n"
698 " -d | No | Daemonize \n"
699 " --pidfile | None | Pidfile \n"
701 " -t | No | Number of threads \n"
708 int main(int argc, char *argv[])
710 struct peerd *peer = NULL;
713 long portno_start = -1, portno_end = -1, portno = -1;
716 int daemonize = 0, help = 0;
717 uint32_t nr_ops = 16;
718 uint32_t nr_threads = 1;
719 unsigned int debug_level = 0;
720 uint32_t defer_portno = NoPort;
724 char spec[MAX_SPEC_LEN + 1];
725 char logfile[MAX_LOGFILE_LEN + 1];
726 char pidfile[MAX_PIDFILE_LEN + 1];
732 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
733 // -dp xseg_portno to defer blocking requests
735 //TODO print messages on arg parsing error
736 BEGIN_READ_ARGS(argc, argv);
737 READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
738 READ_ARG_ULONG("-sp", portno_start);
739 READ_ARG_ULONG("-ep", portno_end);
740 READ_ARG_ULONG("-p", portno);
741 READ_ARG_ULONG("-n", nr_ops);
742 READ_ARG_ULONG("-v", debug_level);
744 READ_ARG_ULONG("-t", nr_threads);
746 // READ_ARG_ULONG("-dp", defer_portno);
747 READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
748 READ_ARG_BOOL("-d", daemonize);
749 READ_ARG_BOOL("-h", help);
750 READ_ARG_BOOL("--help", help);
751 READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
759 r = init_logctx(&lc, argv[0], debug_level, logfile,
760 REDIRECT_STDOUT|REDIRECT_STDERR);
762 XSEGLOG("Cannot initialize logging to logfile");
765 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
768 pid_fd = pidfile_open(pidfile, &old_pid);
771 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
773 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
780 if (daemon(0, 1) < 0){
781 XSEGLOG2(&lc, E, "Cannot daemonize");
787 pidfile_write(pid_fd);
789 //TODO perform argument sanity checks
790 verbose = debug_level;
792 portno_start = portno;
795 if (portno_start == -1 || portno_end == -1){
796 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
802 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
808 r = custom_peer_init(peer, argc, argv);
813 peerd_start_threads(peer);
817 st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
818 r = st_thread_join(st, NULL);
820 r = peerd_loop(peer);
824 pidfile_remove(pidfile, pid_fd);