2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <sys/types.h>
40 #include <sys/syscall.h>
51 #include <xseg/xseg.h>
55 #define PEER_TYPE "pthread"
57 #define PEER_TYPE "posix"
60 //FIXME this should not be defined here probably
61 #define MAX_SPEC_LEN 128
62 #define MAX_PIDFILE_LEN 512
64 volatile unsigned int terminated = 0;
65 unsigned int verbose = 0;
72 struct peerd *global_peer;
79 void (*func)(void *arg);
83 inline static struct thread* alloc_thread(struct peerd *peer)
85 xqindex idx = xq_pop_head(&peer->threads, 1);
88 return peer->thread + idx;
91 inline static void free_thread(struct peerd *peer, struct thread *t)
93 xqindex idx = t - peer->thread;
94 xq_append_head(&peer->threads, idx, 1);
98 inline static void __wake_up_thread(struct thread *t)
100 pthread_mutex_lock(&t->lock);
101 pthread_cond_signal(&t->cond);
102 pthread_mutex_unlock(&t->lock);
105 inline static void wake_up_thread(struct thread* t)
112 inline static int wake_up_next_thread(struct peerd *peer)
114 return (xseg_signal(peer->xseg, peer->portno_start));
119 * extern is needed if this function is going to be called by another file
120 * such as bench-xseg.c
122 inline extern int isTerminate()
124 /* ta doesn't need to be taken into account, because the main loops
125 * doesn't check the terminated flag if ta is not 0.
129 return (!ta & terminated);
137 void signal_handler(int signal)
139 XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
142 wake_up_next_thread(global_peer);
146 void renew_logfile(int signal)
148 XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
149 renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
152 static int setup_signals(struct peerd *peer)
159 sigemptyset(&sa.sa_mask);
161 sa.sa_handler = signal_handler;
162 r = sigaction(SIGTERM, &sa, NULL);
165 r = sigaction(SIGINT, &sa, NULL);
168 r = sigaction(SIGQUIT, &sa, NULL);
172 sa.sa_handler = renew_logfile;
173 r = sigaction(SIGUSR1, &sa, NULL);
180 inline int canDefer(struct peerd *peer)
182 return !(peer->defer_portno == NoPort);
185 void print_req(struct xseg *xseg, struct xseg_request *req)
187 char target[64], data[64];
188 char *req_target, *req_data;
189 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
190 req_target = xseg_get_target(xseg, req);
191 req_data = xseg_get_data(xseg, req);
194 strncpy(target, req_target, end);
196 strncpy(data, req_data, 63);
198 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
199 "src: %u, transit: %u, dst: %u effective dst: %u\n"
200 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
201 (unsigned long)(req),
202 (unsigned int)req->op,
203 (unsigned long long)req->offset,
204 (unsigned long)req->size,
205 (unsigned long)req->serviced,
206 (unsigned int)req->state,
207 (unsigned int)req->src_portno,
208 (unsigned int)req->transit_portno,
209 (unsigned int)req->dst_portno,
210 (unsigned int)req->effective_dst_portno,
211 (unsigned int)req->targetlen, target,
212 (unsigned long long)req->datalen, data);
215 void log_pr(char *msg, struct peer_req *pr)
217 char target[64], data[64];
218 char *req_target, *req_data;
219 struct peerd *peer = pr->peer;
220 struct xseg *xseg = pr->peer->xseg;
221 req_target = xseg_get_target(xseg, pr->req);
222 req_data = xseg_get_data(xseg, pr->req);
223 /* null terminate name in case of req->target is less than 63 characters,
224 * and next character after name (aka first byte of next buffer) is not
227 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
229 strncpy(target, req_target, end);
231 strncpy(data, req_data, 63);
233 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
234 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
236 (unsigned int)(pr - peer->peer_reqs),
237 (unsigned int)pr->req->op,
238 (unsigned long long)pr->req->offset,
239 (unsigned long)pr->req->size,
240 (unsigned long)pr->req->serviced,
241 (unsigned long)pr->retval,
242 (unsigned int)pr->req->state,
243 (unsigned int)pr->req->targetlen, target,
244 (unsigned long long)pr->req->datalen, data);
248 inline struct peer_req *alloc_peer_req(struct peerd *peer)
250 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
253 return peer->peer_reqs + idx;
256 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
258 xqindex idx = pr - peer->peer_reqs;
260 xq_append_head(&peer->free_reqs, idx, 1);
263 struct timeval resp_start, resp_end, resp_accum = {0, 0};
264 uint64_t responds = 0;
265 void get_responds_stats(){
266 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
267 //(unsigned int)(t - peer->thread),
268 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
272 void fail(struct peerd *peer, struct peer_req *pr)
274 struct xseg_request *req = pr->req;
276 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
277 req->state |= XS_FAILED;
278 //xseg_set_req_data(peer->xseg, pr->req, NULL);
279 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
280 xseg_signal(peer->xseg, p);
281 free_peer_req(peer, pr);
283 wake_up_next_thread(peer);
288 void complete(struct peerd *peer, struct peer_req *pr)
290 struct xseg_request *req = pr->req;
292 req->state |= XS_SERVED;
293 //xseg_set_req_data(peer->xseg, pr->req, NULL);
294 //gettimeofday(&resp_start, NULL);
295 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
296 //gettimeofday(&resp_end, NULL);
298 //timersub(&resp_end, &resp_start, &resp_end);
299 //timeradd(&resp_end, &resp_accum, &resp_accum);
300 //printf("xseg_signal: %u\n", p);
301 xseg_signal(peer->xseg, p);
302 free_peer_req(peer, pr);
304 wake_up_next_thread(peer);
308 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
309 struct xseg_request *req)
311 struct xseg_request *xreq = pr->req;
312 //assert xreq == req;
313 XSEGLOG2(&lc, D, "Handle accepted");
315 //xreq->state = XS_ACCEPTED;
317 dispatch(peer, pr, req, dispatch_accept);
320 static void handle_received(struct peerd *peer, struct peer_req *pr,
321 struct xseg_request *req)
323 //struct xseg_request *req = pr->req;
324 //assert req->state != XS_ACCEPTED;
325 XSEGLOG2(&lc, D, "Handle received \n");
326 dispatch(peer, pr, req, dispatch_receive);
329 struct timeval sub_start, sub_end, sub_accum = {0, 0};
330 uint64_t submits = 0;
331 void get_submits_stats(){
332 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
333 //(unsigned int)(t - peer->thread),
334 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
337 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
340 struct xseg_request *req = pr->req;
341 // assert req->portno == peer->portno ?
342 //TODO small function with error checking
343 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
344 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
347 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
348 //gettimeofday(&sub_start, NULL);
349 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
350 //gettimeofday(&sub_end, NULL);
352 //timersub(&sub_end, &sub_start, &sub_end);
353 //timeradd(&sub_end, &sub_accum, &sub_accum);
356 xseg_signal(peer->xseg, ret);
360 int check_ports(struct peerd *peer)
362 struct xseg *xseg = peer->xseg;
363 xport portno_start = peer->portno_start;
364 xport portno_end = peer->portno_end;
365 struct xseg_request *accepted, *received;
370 for (i = portno_start; i <= portno_end; i++) {
373 if (!isTerminate()) {
374 pr = alloc_peer_req(peer);
376 accepted = xseg_accept(xseg, i, X_NONBLOCK);
380 xseg_cancel_wait(xseg, i);
381 handle_accepted(peer, pr, accepted);
385 free_peer_req(peer, pr);
389 received = xseg_receive(xseg, i, X_NONBLOCK);
391 r = xseg_get_req_data(xseg, received, (void **) &pr);
393 XSEGLOG2(&lc, W, "Received request with no pr data\n");
394 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
396 XSEGLOG2(&lc, W, "Could not respond stale request");
397 xseg_put_request(xseg, received, portno_start);
400 xseg_signal(xseg, p);
403 //maybe perform sanity check for pr
404 xseg_cancel_wait(xseg, i);
405 handle_received(peer, pr, received);
415 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
417 struct thread *t = alloc_thread(peer);
424 // we could hijack a thread
428 static void* thread_loop(void *arg)
430 struct thread *t = (struct thread *) arg;
431 struct peerd *peer = t->peer;
432 struct xseg *xseg = peer->xseg;
433 xport portno_start = peer->portno_start;
434 xport portno_end = peer->portno_end;
435 pid_t pid =syscall(SYS_gettid);
437 uint64_t threshold=1000/(1 + portno_end - portno_start);
439 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
441 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
442 xseg_init_local_signal(xseg, peer->portno_start);
443 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
444 XSEGLOG("Head of loop.\n");
446 XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
447 xseg_cancel_wait(xseg, peer->portno_start);
454 for(loops = threshold; loops > 0; loops--) {
456 xseg_prepare_wait(xseg, peer->portno_start);
457 if (check_ports(peer))
460 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
461 xseg_wait_signal(xseg, 10000000UL);
462 xseg_cancel_wait(xseg, peer->portno_start);
463 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
465 wake_up_next_thread(peer);
466 custom_peer_finalize(peer);
470 int peerd_start_threads(struct peerd *peer)
473 uint32_t nr_threads = peer->nr_threads;
475 for (i = 0; i < nr_threads; i++) {
476 peer->thread[i].peer = peer;
477 pthread_cond_init(&peer->thread[i].cond,NULL);
478 pthread_mutex_init(&peer->thread[i].lock, NULL);
479 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
480 peer->thread[i].func = NULL;
481 peer->thread[i].arg = NULL;
489 int defer_request(struct peerd *peer, struct peer_req *pr)
493 if (!canDefer(peer)){
494 XSEGLOG2(&lc, E, "Peer cannot defer requests");
497 p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
500 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
503 r = xseg_signal(peer->xseg, p);
505 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
507 free_peer_req(peer, pr);
511 static int peerd_loop(struct peerd *peer)
515 if (peer->interactive_func)
516 peer->interactive_func();
517 for (i = 0; i < peer->nr_threads; i++) {
518 pthread_join(peer->thread[i].tid, NULL);
521 struct xseg *xseg = peer->xseg;
522 xport portno_start = peer->portno_start;
523 xport portno_end = peer->portno_end;
524 uint64_t threshold=1000/(1 + portno_end - portno_start);
525 pid_t pid =syscall(SYS_gettid);
528 XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
529 xseg_init_local_signal(xseg, peer->portno_start);
530 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
531 for(loops= threshold; loops > 0; loops--) {
533 xseg_prepare_wait(xseg, peer->portno_start);
534 if (check_ports(peer))
542 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
543 xseg_wait_signal(xseg, 10000000UL);
544 xseg_cancel_wait(xseg, peer->portno_start);
545 XSEGLOG2(&lc, I, "Peer woke up\n");
550 custom_peer_finalize(peer);
551 xseg_quit_local_signal(xseg, peer->portno_start);
556 static struct xseg *join(char *spec)
558 struct xseg_config config;
561 (void)xseg_parse_spec(spec, &config);
562 xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
566 (void)xseg_create(&config);
567 return xseg_join(config.type, config.name, PEER_TYPE, NULL);
570 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
571 long portno_end, uint32_t nr_threads, xport defer_portno)
575 struct xseg_port *port;
580 peer = malloc(sizeof(struct peerd));
585 peer->nr_ops = nr_ops;
586 peer->defer_portno = defer_portno;
588 peer->nr_threads = nr_threads;
589 peer->thread = calloc(nr_threads, sizeof(struct thread));
593 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
594 if (!peer->peer_reqs){
600 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
603 if (!xq_alloc_empty(&peer->threads, nr_threads))
606 if (xseg_initialize()){
607 printf("cannot initialize library\n");
610 peer->xseg = join(spec);
614 peer->portno_start = (xport) portno_start;
615 peer->portno_end= (xport) portno_end;
616 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
618 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
623 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
624 struct xseg_port *tmp;
625 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
627 printf("cannot bind to port %u\n", (unsigned int) p);
632 printf("Peer on ports %u-%u\n", peer->portno_start,
635 for (i = 0; i < nr_ops; i++) {
636 peer->peer_reqs[i].peer = peer;
637 peer->peer_reqs[i].req = NULL;
638 peer->peer_reqs[i].retval = 0;
639 peer->peer_reqs[i].priv = NULL;
640 peer->peer_reqs[i].portno = NoPort;
642 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
646 peer->interactive_func = NULL;
651 int pidfile_remove(char *path, int fd)
654 return (unlink(path));
657 int pidfile_write(int pid_fd)
660 snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
663 lseek(pid_fd, 0, SEEK_SET);
664 int ret = write(pid_fd, buf, strlen(buf));
668 int pidfile_read(char *path, pid_t *pid)
670 char buf[16], *endptr;
673 int fd = open(path, O_RDONLY);
676 int ret = read(fd, buf, 15);
682 *pid = strtol(buf, &endptr, 10);
683 if (endptr != &buf[ret]){
691 int pidfile_open(char *path, pid_t *old_pid)
694 int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
697 pidfile_read(path, old_pid);
702 void usage(char *argv0)
704 fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
705 fprintf(stderr, "General peer options:\n"
706 " Option | Default | \n"
707 " --------------------------------------------\n"
708 " -g | None | Segment spec to join\n"
709 " -sp | NoPort | Start portno to bind\n"
710 " -ep | NoPort | End portno to bind\n"
711 " -p | NoPort | Portno to bind\n"
712 " -n | 16 | Number of ops\n"
713 " -v | 0 | Verbosity level\n"
714 " -l | None | Logfile \n"
715 " -d | No | Daemonize \n"
716 " --pidfile | None | Pidfile \n"
718 " -t | No | Number of threads \n"
725 int main(int argc, char *argv[])
727 struct peerd *peer = NULL;
730 long portno_start = -1, portno_end = -1, portno = -1;
733 int daemonize = 0, help = 0;
734 uint32_t nr_ops = 16;
735 uint32_t nr_threads = 1;
736 unsigned int debug_level = 0;
737 xport defer_portno = NoPort;
741 char spec[MAX_SPEC_LEN + 1];
742 char logfile[MAX_LOGFILE_LEN + 1];
743 char pidfile[MAX_PIDFILE_LEN + 1];
749 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
750 // -dp xseg_portno to defer blocking requests
752 //TODO print messages on arg parsing error
753 BEGIN_READ_ARGS(argc, argv);
754 READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
755 READ_ARG_ULONG("-sp", portno_start);
756 READ_ARG_ULONG("-ep", portno_end);
757 READ_ARG_ULONG("-p", portno);
758 READ_ARG_ULONG("-n", nr_ops);
759 READ_ARG_ULONG("-v", debug_level);
761 READ_ARG_ULONG("-t", nr_threads);
763 READ_ARG_ULONG("-dp", defer_portno);
764 READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
765 READ_ARG_BOOL("-d", daemonize);
766 READ_ARG_BOOL("-h", help);
767 READ_ARG_BOOL("--help", help);
768 READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
776 r = init_logctx(&lc, argv[0], debug_level, logfile,
777 REDIRECT_STDOUT|REDIRECT_STDERR);
779 XSEGLOG("Cannot initialize logging to logfile");
782 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
785 pid_fd = pidfile_open(pidfile, &old_pid);
788 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
790 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
797 if (daemon(0, 1) < 0){
798 XSEGLOG2(&lc, E, "Cannot daemonize");
804 pidfile_write(pid_fd);
806 //TODO perform argument sanity checks
807 verbose = debug_level;
809 portno_start = portno;
812 if (portno_start == -1 || portno_end == -1){
813 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
819 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
825 r = custom_peer_init(peer, argc, argv);
830 peerd_start_threads(peer);
834 st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
835 r = st_thread_join(st, NULL);
837 if (peer->custom_peerd_loop)
838 r = peer->custom_peerd_loop(peer);
840 r = peerd_loop(peer);
844 pidfile_remove(pidfile, pid_fd);