2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <sys/types.h>
40 #include <sys/syscall.h>
42 #include <sys/resource.h>
52 #include <xseg/xseg.h>
57 #error "MT and ST_THREADS defines are mutually exclusive"
62 #define PEER_TYPE "pthread"
64 #define PEER_TYPE "posix"
67 //FIXME this should not be defined here probably
68 #define MAX_SPEC_LEN 128
69 #define MAX_PIDFILE_LEN 512
70 #define MAX_CPUS_LEN 512
72 /* Define the cpus on which the threads/process will be pinned */
78 struct cpu_list cpu_list;
79 volatile unsigned int terminated = 0;
80 unsigned int verbose = 0;
87 struct peerd *global_peer;
88 static pthread_key_t threadkey;
90 inline static int wake_up_next_thread(struct peerd *peer)
92 return (xseg_signal(peer->xseg, peer->portno_start));
97 * extern is needed if this function is going to be called by another file
98 * such as bench-xseg.c
101 void signal_handler(int signal)
103 // XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
106 wake_up_next_thread(global_peer);
111 * We want to both print the backtrace and dump the core. To do so, we fork a
112 * process that prints its stack trace, that should be the same as the parents.
113 * Then we wait for 1 sec and we abort. The reason we don't abort immediately
114 * is because it may interrupt the printing of the backtrace.
116 void segv_handler(int signal)
127 void renew_logfile(int signal)
129 // XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
130 renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
133 static int setup_signals(struct peerd *peer)
141 sigemptyset(&sa.sa_mask);
143 sa.sa_handler = signal_handler;
144 r = sigaction(SIGTERM, &sa, NULL);
147 r = sigaction(SIGINT, &sa, NULL);
150 r = sigaction(SIGQUIT, &sa, NULL);
155 * Get the current limits for core files and raise them to the largest
158 if (getrlimit(RLIMIT_CORE, &rlim) < 0)
161 rlim.rlim_cur = rlim.rlim_max;
163 if (setrlimit(RLIMIT_CORE, &rlim) < 0)
166 /* Install handler for segfaults */
167 sa.sa_handler = segv_handler;
168 r = sigaction(SIGSEGV, &sa, NULL);
172 sa.sa_handler = renew_logfile;
173 r = sigaction(SIGUSR1, &sa, NULL);
180 inline int canDefer(struct peerd *peer)
182 return !(peer->defer_portno == NoPort);
185 void print_req(struct xseg *xseg, struct xseg_request *req)
187 char target[64], data[64];
188 char *req_target, *req_data;
189 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
190 req_target = xseg_get_target(xseg, req);
191 req_data = xseg_get_data(xseg, req);
194 strncpy(target, req_target, end);
196 strncpy(data, req_data, 63);
198 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
199 "src: %u, transit: %u, dst: %u effective dst: %u\n"
200 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
201 (unsigned long)(req),
202 (unsigned int)req->op,
203 (unsigned long long)req->offset,
204 (unsigned long)req->size,
205 (unsigned long)req->serviced,
206 (unsigned int)req->state,
207 (unsigned int)req->src_portno,
208 (unsigned int)req->transit_portno,
209 (unsigned int)req->dst_portno,
210 (unsigned int)req->effective_dst_portno,
211 (unsigned int)req->targetlen, target,
212 (unsigned long long)req->datalen, data);
215 void log_pr(char *msg, struct peer_req *pr)
217 char target[64], data[64];
218 char *req_target, *req_data;
219 struct peerd *peer = pr->peer;
220 struct xseg *xseg = pr->peer->xseg;
221 req_target = xseg_get_target(xseg, pr->req);
222 req_data = xseg_get_data(xseg, pr->req);
223 /* null terminate name in case of req->target is less than 63 characters,
224 * and next character after name (aka first byte of next buffer) is not
227 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
229 strncpy(target, req_target, end);
231 strncpy(data, req_data, 63);
233 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
234 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
236 (unsigned int)(pr - peer->peer_reqs),
237 (unsigned int)pr->req->op,
238 (unsigned long long)pr->req->offset,
239 (unsigned long)pr->req->size,
240 (unsigned long)pr->req->serviced,
241 (unsigned long)pr->retval,
242 (unsigned int)pr->req->state,
243 (unsigned int)pr->req->targetlen, target,
244 (unsigned long long)pr->req->datalen, data);
249 inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
252 xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
256 /* try to steal from another thread */
260 for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
261 nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
262 if (!xq_count(&nt->free_thread_reqs))
264 idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
271 pr = peer->peer_reqs + idx;
272 pr->thread_no = t - peer->thread;
277 * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
278 * queue. If a pointer from peer_reqs is popped, we are certain that the
279 * associated memory in peer_reqs is free to use
281 inline struct peer_req *alloc_peer_req(struct peerd *peer)
283 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
286 return peer->peer_reqs + idx;
290 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
292 xqindex idx = pr - peer->peer_reqs;
295 struct thread *t = &peer->thread[pr->thread_no];
296 xq_append_head(&t->free_thread_reqs, idx, 1);
298 xq_append_head(&peer->free_reqs, idx, 1);
303 * Count all free reqs in peer.
304 * Racy, if multithreaded, but the sum should monotonicly increase when checked
305 * after a termination signal is catched.
307 int all_peer_reqs_free(struct peerd *peer)
309 uint32_t free_reqs = 0;
312 for (i = 0; i < peer->nr_threads; i++) {
313 free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
316 free_reqs = xq_count(&peer->free_reqs);
318 if (free_reqs == peer->nr_ops)
323 struct timeval resp_start, resp_end, resp_accum = {0, 0};
324 uint64_t responds = 0;
325 void get_responds_stats(){
326 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
327 //(unsigned int)(t - peer->thread),
328 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
332 void fail(struct peerd *peer, struct peer_req *pr)
334 struct xseg_request *req = pr->req;
337 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
338 req->state |= XS_FAILED;
339 //xseg_set_req_data(peer->xseg, pr->req, NULL);
340 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
341 xseg_signal(peer->xseg, p);
343 free_peer_req(peer, pr);
345 wake_up_next_thread(peer);
350 void complete(struct peerd *peer, struct peer_req *pr)
352 struct xseg_request *req = pr->req;
355 req->state |= XS_SERVED;
356 //xseg_set_req_data(peer->xseg, pr->req, NULL);
357 //gettimeofday(&resp_start, NULL);
358 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
359 //gettimeofday(&resp_end, NULL);
361 //timersub(&resp_end, &resp_start, &resp_end);
362 //timeradd(&resp_end, &resp_accum, &resp_accum);
363 //printf("xseg_signal: %u\n", p);
364 xseg_signal(peer->xseg, p);
366 free_peer_req(peer, pr);
368 wake_up_next_thread(peer);
372 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
373 struct xseg_request *req)
375 struct xseg_request *xreq = pr->req;
376 //assert xreq == req;
377 XSEGLOG2(&lc, D, "Handle accepted");
379 //xreq->state = XS_ACCEPTED;
381 dispatch(peer, pr, req, dispatch_accept);
384 static void handle_received(struct peerd *peer, struct peer_req *pr,
385 struct xseg_request *req)
387 //struct xseg_request *req = pr->req;
388 //assert req->state != XS_ACCEPTED;
389 XSEGLOG2(&lc, D, "Handle received \n");
390 dispatch(peer, pr, req, dispatch_receive);
393 struct timeval sub_start, sub_end, sub_accum = {0, 0};
394 uint64_t submits = 0;
395 void get_submits_stats(){
396 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
397 //(unsigned int)(t - peer->thread),
398 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
401 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
404 struct xseg_request *req = pr->req;
405 // assert req->portno == peer->portno ?
406 //TODO small function with error checking
407 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
408 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
411 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
412 //gettimeofday(&sub_start, NULL);
413 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
414 //gettimeofday(&sub_end, NULL);
416 //timersub(&sub_end, &sub_start, &sub_end);
417 //timeradd(&sub_end, &sub_accum, &sub_accum);
420 xseg_signal(peer->xseg, ret);
425 int check_ports(struct peerd *peer, struct thread *t)
427 int check_ports(struct peerd *peer)
430 struct xseg *xseg = peer->xseg;
431 xport portno_start = peer->portno_start;
432 xport portno_end = peer->portno_end;
433 struct xseg_request *accepted, *received;
438 for (i = portno_start; i <= portno_end; i++) {
441 if (!isTerminate()) {
443 pr = alloc_peer_req(peer, t);
445 pr = alloc_peer_req(peer);
448 accepted = xseg_accept(xseg, i, X_NONBLOCK);
452 xseg_cancel_wait(xseg, i);
453 handle_accepted(peer, pr, accepted);
457 free_peer_req(peer, pr);
461 received = xseg_receive(xseg, i, X_NONBLOCK);
463 r = xseg_get_req_data(xseg, received, (void **) &pr);
465 XSEGLOG2(&lc, W, "Received request with no pr data\n");
466 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
468 XSEGLOG2(&lc, W, "Could not respond stale request");
469 xseg_put_request(xseg, received, portno_start);
472 xseg_signal(xseg, p);
475 //maybe perform sanity check for pr
476 xseg_cancel_wait(xseg, i);
477 handle_received(peer, pr, received);
487 static void* thread_loop(void *arg)
489 struct thread *t = (struct thread *) arg;
490 struct peerd *peer = t->peer;
491 struct xseg *xseg = peer->xseg;
492 xport portno_start = peer->portno_start;
493 xport portno_end = peer->portno_end;
494 pid_t pid =syscall(SYS_gettid);
496 uint64_t threshold=1000/(1 + portno_end - portno_start);
498 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
499 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
500 xseg_init_local_signal(xseg, peer->portno_start);
501 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
502 for(loops = threshold; loops > 0; loops--) {
504 xseg_prepare_wait(xseg, peer->portno_start);
505 if (check_ports(peer, t))
508 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
509 xseg_wait_signal(xseg, 10000000UL);
510 xseg_cancel_wait(xseg, peer->portno_start);
511 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
513 wake_up_next_thread(peer);
514 custom_peer_finalize(peer);
518 void *init_thread_loop(void *arg)
520 struct thread *t = (struct thread *) arg;
521 struct peerd *peer = t->peer;
522 pthread_t thread = pthread_self();
523 long int thread_num = t - t->peer->thread;
532 CPU_SET(cpu_list.cpus[thread_num], &mask);
533 r = pthread_setaffinity_np(thread, sizeof(mask), &mask);
535 perror("sched_setaffinity");
542 * We need an identifier for every thread that will spin in peerd_loop.
543 * The following code is a way to create a string of this format:
545 * minus the null terminator. What we do is we create this string with
546 * snprintf and then resize it to exclude the null terminator with
547 * realloc. Finally, the result string is passed to the (void *arg) field
550 * Since the highest thread number can't be more than 5 digits, using 13
551 * chars should be more than enough.
553 thread_id = malloc(13 * sizeof(char));
554 snprintf(thread_id, 13, "Thread %ld", thread_num);
555 for (i = 0; thread_id[i]; i++) {}
556 t->arg = (void *)realloc(thread_id, i-1);
557 pthread_setspecific(threadkey, t);
560 (void)peer->peerd_loop(t);
562 wake_up_next_thread(peer);
563 custom_peer_finalize(peer);
568 int peerd_start_threads(struct peerd *peer)
571 uint32_t nr_threads = peer->nr_threads;
573 for (i = 0; i < nr_threads; i++) {
574 peer->thread[i].thread_no = i;
575 peer->thread[i].peer = peer;
576 pthread_create(&peer->thread[i].tid, NULL,
577 init_thread_loop, (void *)(peer->thread + i));
580 if (peer->interactive_func)
581 peer->interactive_func();
582 for (i = 0; i < nr_threads; i++) {
583 pthread_join(peer->thread[i].tid, NULL);
590 int defer_request(struct peerd *peer, struct peer_req *pr)
594 if (!canDefer(peer)){
595 XSEGLOG2(&lc, E, "Peer cannot defer requests");
598 p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
601 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
604 r = xseg_signal(peer->xseg, p);
606 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
608 free_peer_req(peer, pr);
613 * generic_peerd_loop is a general-purpose port-checker loop that is
614 * suitable both for multi-threaded and single-threaded peers.
616 static int generic_peerd_loop(void *arg)
619 struct thread *t = (struct thread *) arg;
620 struct peerd *peer = t->peer;
623 struct peerd *peer = (struct peerd *) arg;
624 char id[4] = {'P','e','e','r'};
626 struct xseg *xseg = peer->xseg;
627 xport portno_start = peer->portno_start;
628 xport portno_end = peer->portno_end;
629 pid_t pid = syscall(SYS_gettid);
630 uint64_t threshold = peer->threshold;
631 threshold /= (1 + portno_end - portno_start);
635 XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
636 xseg_init_local_signal(xseg, peer->portno_start);
637 //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
638 for (;!(isTerminate() && all_peer_reqs_free(peer));) {
639 //Heart of peerd_loop. This loop is common for everyone.
640 for(loops = threshold; loops > 0; loops--) {
642 xseg_prepare_wait(xseg, peer->portno_start);
644 if (check_ports(peer, t))
646 if (check_ports(peer))
656 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
657 xseg_wait_signal(xseg, 10000000UL);
658 xseg_cancel_wait(xseg, peer->portno_start);
659 XSEGLOG2(&lc, I, "%s woke up\n", id);
664 static int init_peerd_loop(struct peerd *peer)
666 struct xseg *xseg = peer->xseg;
672 CPU_SET(cpu_list.cpus[0], &mask);
674 r = sched_setaffinity(0, sizeof(mask), &mask);
676 perror("sched_setaffinity");
681 peer->peerd_loop(peer);
682 custom_peer_finalize(peer);
683 xseg_quit_local_signal(xseg, peer->portno_start);
689 static void * st_peerd_loop(void *peer)
691 struct peerd *peerd = peer;
693 return init_peerd_loop(peerd) ? (void *)-1 : (void *)0;
697 static struct xseg *join(char *spec)
699 struct xseg_config config;
702 (void)xseg_parse_spec(spec, &config);
703 xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
707 (void)xseg_create(&config);
708 return xseg_join(config.type, config.name, PEER_TYPE, NULL);
711 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
712 long portno_end, uint32_t nr_threads, xport defer_portno,
717 struct xseg_port *port;
724 peer = malloc(sizeof(struct peerd));
729 peer->nr_ops = nr_ops;
730 peer->defer_portno = defer_portno;
731 peer->threshold = threshold;
733 peer->nr_threads = nr_threads;
734 peer->thread = calloc(nr_threads, sizeof(struct thread));
737 if (!xq_alloc_empty(&peer->threads, nr_threads))
739 for (i = 0; i < nr_threads; i++) {
740 if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
743 for (i = 0; i < nr_ops; i++) {
744 __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
747 pthread_key_create(&threadkey, NULL);
749 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
751 if (peer->free_reqs.size < peer->nr_ops)
752 peer->nr_ops = peer->free_reqs.size;
754 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
755 if (!peer->peer_reqs){
760 if (xseg_initialize()){
761 printf("cannot initialize library\n");
764 peer->xseg = join(spec);
768 peer->portno_start = (xport) portno_start;
769 peer->portno_end= (xport) portno_end;
772 * Start binding ports from portno_start to portno_end.
773 * The first port we bind will have its signal_desc initialized by xseg
774 * and the same signal_desc will be used for all the other ports.
776 for (p = peer->portno_start; p <= peer->portno_end; p++) {
777 port = xseg_bind_port(peer->xseg, p, sd);
779 printf("cannot bind to port %u\n", (unsigned int) p);
782 if (p == peer->portno_start)
783 sd = xseg_get_signal_desc(peer->xseg, port);
786 printf("Peer on ports %u-%u\n", peer->portno_start, peer->portno_end);
788 for (i = 0; i < nr_ops; i++) {
789 peer->peer_reqs[i].peer = peer;
790 peer->peer_reqs[i].req = NULL;
791 peer->peer_reqs[i].retval = 0;
792 peer->peer_reqs[i].priv = NULL;
793 peer->peer_reqs[i].portno = NoPort;
795 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
799 //Plug default peerd_loop. This can change later on by custom_peer_init.
800 peer->peerd_loop = generic_peerd_loop;
803 peer->interactive_func = NULL;
808 int pidfile_remove(char *path, int fd)
811 return (unlink(path));
814 int pidfile_write(int pid_fd)
817 snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
820 lseek(pid_fd, 0, SEEK_SET);
821 int ret = write(pid_fd, buf, strlen(buf));
825 int pidfile_read(char *path, pid_t *pid)
827 char buf[16], *endptr;
830 int fd = open(path, O_RDONLY);
833 int ret = read(fd, buf, 15);
839 *pid = strtol(buf, &endptr, 10);
840 if (endptr != &buf[ret]){
848 int pidfile_open(char *path, pid_t *old_pid)
851 int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IWUSR);
854 pidfile_read(path, old_pid);
859 int get_cpu_list(char *cpus, struct cpu_list *cpu_list)
864 while ((tok = strtok(cpus, ",")) != NULL) {
865 cpu_list->cpus[i++] = strtol(tok, &rem, 10);
866 if (strlen(rem) > 0) /* Not a number */
876 void usage(char *argv0)
878 fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
879 fprintf(stderr, "General peer options:\n"
880 " Option | Default | \n"
881 " --------------------------------------------\n"
882 " -g | None | Segment spec to join\n"
883 " -sp | NoPort | Start portno to bind\n"
884 " -ep | NoPort | End portno to bind\n"
885 " -p | NoPort | Portno to bind\n"
886 " -n | 16 | Number of ops\n"
887 " -v | 0 | Verbosity level\n"
888 " -l | None | Logfile \n"
889 " -d | No | Daemonize \n"
890 " --pidfile | None | Pidfile \n"
892 " -t | No | Number of threads \n"
894 " --cpus | No | Coma-separated list of CPUs\n"
895 " | | to pin the process or threads\n"
901 int main(int argc, char *argv[])
903 struct peerd *peer = NULL;
906 long portno_start = -1, portno_end = -1, portno = -1;
909 int daemonize = 0, help = 0;
910 uint32_t nr_ops = 16;
911 uint32_t nr_threads = 1;
912 uint64_t threshold = 1000;
913 unsigned int debug_level = 0;
914 xport defer_portno = NoPort;
918 char spec[MAX_SPEC_LEN + 1];
919 char logfile[MAX_LOGFILE_LEN + 1];
920 char pidfile[MAX_PIDFILE_LEN + 1];
921 char cpus[MAX_CPUS_LEN + 1];
928 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
929 // -dp xseg_portno to defer blocking requests
931 //TODO print messages on arg parsing error
932 BEGIN_READ_ARGS(argc, argv);
933 READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
934 READ_ARG_ULONG("-sp", portno_start);
935 READ_ARG_ULONG("-ep", portno_end);
936 READ_ARG_ULONG("-p", portno);
937 READ_ARG_ULONG("-n", nr_ops);
938 READ_ARG_ULONG("-v", debug_level);
940 READ_ARG_ULONG("-t", nr_threads);
942 READ_ARG_ULONG("-dp", defer_portno);
943 READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
944 READ_ARG_BOOL("-d", daemonize);
945 READ_ARG_BOOL("-h", help);
946 READ_ARG_BOOL("--help", help);
947 READ_ARG_ULONG("--threshold", threshold);
948 READ_ARG_STRING("--cpus", cpus, MAX_CPUS_LEN);
949 READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
957 r = init_logctx(&lc, argv[0], debug_level, logfile,
958 REDIRECT_STDOUT|REDIRECT_STDERR);
960 XSEGLOG("Cannot initialize logging to logfile");
963 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
966 pid_fd = pidfile_open(pidfile, &old_pid);
969 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
971 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
978 if (close(STDIN_FILENO)){
979 XSEGLOG2(&lc, W, "Could not close stdin");
981 if (daemon(0, 1) < 0){
982 XSEGLOG2(&lc, E, "Cannot daemonize");
988 pidfile_write(pid_fd);
991 r = get_cpu_list(cpus, &cpu_list);
994 XSEGLOG2(&lc, E, "--cpus %s: Invalid input", cpus);
998 if (nr_threads != cpu_list.len) {
999 XSEGLOG2(&lc, E, "--cpus %s: Number of "
1000 "threads (%d) and CPUs don't match",
1005 if (cpu_list.len != 1) {
1006 XSEGLOG2(&lc, E, "--cpus %s: Too many CPUs for a "
1007 "single process", cpus);
1014 //TODO perform argument sanity checks
1015 verbose = debug_level;
1017 portno_start = portno;
1018 portno_end = portno;
1020 if (portno_start == -1 || portno_end == -1){
1021 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
1027 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno, threshold);
1032 setup_signals(peer);
1033 r = custom_peer_init(peer, argc, argv);
1038 peerd_start_threads(peer);
1039 #elif defined(ST_THREADS)
1040 st_thread_t st = st_thread_create(st_peerd_loop, peer, 1, 0);
1041 r = st_thread_join(st, NULL);
1043 r = init_peerd_loop(peer);
1047 pidfile_remove(pidfile, pid_fd);