2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <sys/types.h>
40 #include <sys/syscall.h>
50 #include <xseg/xseg.h>
54 #define PEER_TYPE "pthread"
56 #define PEER_TYPE "posix"
59 //FIXME this should not be defined here probably
60 #define MAX_SPEC_LEN 128
61 #define MAX_PIDFILE_LEN 512
63 volatile unsigned int terminated = 0;
64 unsigned int verbose = 0;
71 struct peerd *global_peer;
80 void (*func)(void *arg);
83 struct xq free_thread_reqs;
86 inline static struct thread* alloc_thread(struct peerd *peer)
88 xqindex idx = xq_pop_head(&peer->threads, 1);
91 return peer->thread + idx;
94 inline static void free_thread(struct peerd *peer, struct thread *t)
96 xqindex idx = t - peer->thread;
97 xq_append_head(&peer->threads, idx, 1);
101 inline static void __wake_up_thread(struct thread *t)
103 pthread_mutex_lock(&t->lock);
104 pthread_cond_signal(&t->cond);
105 pthread_mutex_unlock(&t->lock);
108 inline static void wake_up_thread(struct thread* t)
115 inline static int wake_up_next_thread(struct peerd *peer)
117 return (xseg_signal(peer->xseg, peer->portno_start));
122 * extern is needed if this function is going to be called by another file
123 * such as bench-xseg.c
126 void signal_handler(int signal)
128 XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
131 wake_up_next_thread(global_peer);
135 void renew_logfile(int signal)
137 XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
138 renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
141 static int setup_signals(struct peerd *peer)
148 sigemptyset(&sa.sa_mask);
150 sa.sa_handler = signal_handler;
151 r = sigaction(SIGTERM, &sa, NULL);
154 r = sigaction(SIGINT, &sa, NULL);
157 r = sigaction(SIGQUIT, &sa, NULL);
161 sa.sa_handler = renew_logfile;
162 r = sigaction(SIGUSR1, &sa, NULL);
169 inline int canDefer(struct peerd *peer)
171 return !(peer->defer_portno == NoPort);
174 void print_req(struct xseg *xseg, struct xseg_request *req)
176 char target[64], data[64];
177 char *req_target, *req_data;
178 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
179 req_target = xseg_get_target(xseg, req);
180 req_data = xseg_get_data(xseg, req);
183 strncpy(target, req_target, end);
185 strncpy(data, req_data, 63);
187 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
188 "src: %u, transit: %u, dst: %u effective dst: %u\n"
189 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
190 (unsigned long)(req),
191 (unsigned int)req->op,
192 (unsigned long long)req->offset,
193 (unsigned long)req->size,
194 (unsigned long)req->serviced,
195 (unsigned int)req->state,
196 (unsigned int)req->src_portno,
197 (unsigned int)req->transit_portno,
198 (unsigned int)req->dst_portno,
199 (unsigned int)req->effective_dst_portno,
200 (unsigned int)req->targetlen, target,
201 (unsigned long long)req->datalen, data);
204 void log_pr(char *msg, struct peer_req *pr)
206 char target[64], data[64];
207 char *req_target, *req_data;
208 struct peerd *peer = pr->peer;
209 struct xseg *xseg = pr->peer->xseg;
210 req_target = xseg_get_target(xseg, pr->req);
211 req_data = xseg_get_data(xseg, pr->req);
212 /* null terminate name in case of req->target is less than 63 characters,
213 * and next character after name (aka first byte of next buffer) is not
216 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
218 strncpy(target, req_target, end);
220 strncpy(data, req_data, 63);
222 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
223 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
225 (unsigned int)(pr - peer->peer_reqs),
226 (unsigned int)pr->req->op,
227 (unsigned long long)pr->req->offset,
228 (unsigned long)pr->req->size,
229 (unsigned long)pr->req->serviced,
230 (unsigned long)pr->retval,
231 (unsigned int)pr->req->state,
232 (unsigned int)pr->req->targetlen, target,
233 (unsigned long long)pr->req->datalen, data);
238 inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
242 xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
245 /* try to steal from the next thread */
246 nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads];
247 idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
251 pr = peer->peer_reqs + idx;
252 pr->thread_no = t - peer->thread;
257 * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
258 * queue. If a pointer from peer_reqs is popped, we are certain that the
259 * associated memory in peer_reqs is free to use
261 inline struct peer_req *alloc_peer_req(struct peerd *peer)
263 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
266 return peer->peer_reqs + idx;
270 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
272 xqindex idx = pr - peer->peer_reqs;
275 struct thread *t = &peer->thread[pr->thread_no];
276 xq_append_head(&t->free_thread_reqs, idx, 1);
278 xq_append_head(&peer->free_reqs, idx, 1);
282 struct timeval resp_start, resp_end, resp_accum = {0, 0};
283 uint64_t responds = 0;
284 void get_responds_stats(){
285 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
286 //(unsigned int)(t - peer->thread),
287 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
291 void fail(struct peerd *peer, struct peer_req *pr)
293 struct xseg_request *req = pr->req;
296 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
297 req->state |= XS_FAILED;
298 //xseg_set_req_data(peer->xseg, pr->req, NULL);
299 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
300 xseg_signal(peer->xseg, p);
302 free_peer_req(peer, pr);
304 wake_up_next_thread(peer);
309 void complete(struct peerd *peer, struct peer_req *pr)
311 struct xseg_request *req = pr->req;
314 req->state |= XS_SERVED;
315 //xseg_set_req_data(peer->xseg, pr->req, NULL);
316 //gettimeofday(&resp_start, NULL);
317 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
318 //gettimeofday(&resp_end, NULL);
320 //timersub(&resp_end, &resp_start, &resp_end);
321 //timeradd(&resp_end, &resp_accum, &resp_accum);
322 //printf("xseg_signal: %u\n", p);
323 xseg_signal(peer->xseg, p);
325 free_peer_req(peer, pr);
327 wake_up_next_thread(peer);
331 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
332 struct xseg_request *req)
334 struct xseg_request *xreq = pr->req;
335 //assert xreq == req;
336 XSEGLOG2(&lc, D, "Handle accepted");
338 //xreq->state = XS_ACCEPTED;
340 dispatch(peer, pr, req, dispatch_accept);
343 static void handle_received(struct peerd *peer, struct peer_req *pr,
344 struct xseg_request *req)
346 //struct xseg_request *req = pr->req;
347 //assert req->state != XS_ACCEPTED;
348 XSEGLOG2(&lc, D, "Handle received \n");
349 dispatch(peer, pr, req, dispatch_receive);
352 struct timeval sub_start, sub_end, sub_accum = {0, 0};
353 uint64_t submits = 0;
354 void get_submits_stats(){
355 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
356 //(unsigned int)(t - peer->thread),
357 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
360 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
363 struct xseg_request *req = pr->req;
364 // assert req->portno == peer->portno ?
365 //TODO small function with error checking
366 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
367 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
370 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
371 //gettimeofday(&sub_start, NULL);
372 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
373 //gettimeofday(&sub_end, NULL);
375 //timersub(&sub_end, &sub_start, &sub_end);
376 //timeradd(&sub_end, &sub_accum, &sub_accum);
379 xseg_signal(peer->xseg, ret);
384 int check_ports(struct peerd *peer, struct thread *t)
386 int check_ports(struct peerd *peer)
389 struct xseg *xseg = peer->xseg;
390 xport portno_start = peer->portno_start;
391 xport portno_end = peer->portno_end;
392 struct xseg_request *accepted, *received;
397 for (i = portno_start; i <= portno_end; i++) {
400 //Shouldn't we just leave?
401 if (!isTerminate()) {
402 //Better way than alloc/free all the time?
403 //Cache the allocated peer_req?
405 pr = alloc_peer_req(peer, t);
407 pr = alloc_peer_req(peer);
410 accepted = xseg_accept(xseg, i, X_NONBLOCK);
414 xseg_cancel_wait(xseg, i);
415 handle_accepted(peer, pr, accepted);
419 free_peer_req(peer, pr);
423 received = xseg_receive(xseg, i, X_NONBLOCK);
425 r = xseg_get_req_data(xseg, received, (void **) &pr);
427 XSEGLOG2(&lc, W, "Received request with no pr data\n");
428 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
430 XSEGLOG2(&lc, W, "Could not respond stale request");
431 xseg_put_request(xseg, received, portno_start);
434 xseg_signal(xseg, p);
437 //maybe perform sanity check for pr
438 xseg_cancel_wait(xseg, i);
439 handle_received(peer, pr, received);
449 static void* thread_loop(void *arg)
451 struct thread *t = (struct thread *) arg;
452 struct peerd *peer = t->peer;
453 struct xseg *xseg = peer->xseg;
454 xport portno_start = peer->portno_start;
455 xport portno_end = peer->portno_end;
456 pid_t pid =syscall(SYS_gettid);
458 uint64_t threshold=1000/(1 + portno_end - portno_start);
460 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
461 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
462 xseg_init_local_signal(xseg, peer->portno_start);
463 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
464 for(loops = threshold; loops > 0; loops--) {
466 xseg_prepare_wait(xseg, peer->portno_start);
467 if (check_ports(peer, t))
470 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
471 xseg_wait_signal(xseg, 10000000UL);
472 xseg_cancel_wait(xseg, peer->portno_start);
473 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
475 wake_up_next_thread(peer);
476 custom_peer_finalize(peer);
480 void *init_thread_loop(void *arg)
482 struct thread *t = (struct thread *) arg;
483 struct peerd *peer = t->peer;
488 * We need an identifier for every thread that will spin in peerd_loop.
489 * The following code is a way to create a string of this format:
491 * minus the null terminator. What we do is we create this string with
492 * snprintf and then resize it to exclude the null terminator with
493 * realloc. Finally, the result string is passed to the (void *arg) field
496 * Since the highest thread number can't be more than 5 digits, using 13
497 * chars should be more than enough.
499 thread_id = malloc(13 * sizeof(char));
500 snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
501 for (i = 0; thread_id[i]; i++) {}
502 t->arg = (void *)realloc(thread_id, i-1);
505 (void)peer->peerd_loop(t);
507 wake_up_next_thread(peer);
508 custom_peer_finalize(peer);
513 int peerd_start_threads(struct peerd *peer)
516 uint32_t nr_threads = peer->nr_threads;
518 for (i = 0; i < nr_threads; i++) {
519 peer->thread[i].func = NULL;
520 peer->thread[i].arg = NULL;
521 peer->thread[i].peer = peer;
522 pthread_cond_init(&peer->thread[i].cond,NULL);
523 pthread_mutex_init(&peer->thread[i].lock, NULL);
524 pthread_create(&peer->thread[i].tid, NULL,
525 init_thread_loop, (void *)(peer->thread + i));
528 if (peer->interactive_func)
529 peer->interactive_func();
530 for (i = 0; i < nr_threads; i++) {
531 pthread_join(peer->thread[i].tid, NULL);
538 int defer_request(struct peerd *peer, struct peer_req *pr)
542 if (!canDefer(peer)){
543 XSEGLOG2(&lc, E, "Peer cannot defer requests");
546 p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
549 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
552 r = xseg_signal(peer->xseg, p);
554 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
556 free_peer_req(peer, pr);
561 * generic_peerd_loop is a general-purpose port-checker loop that is
562 * suitable both for multi-threaded and single-threaded peers.
564 static int generic_peerd_loop(void *arg)
567 struct thread *t = (struct thread *) arg;
568 struct peerd *peer = t->peer;
571 struct peerd *peer = (struct peerd *) arg;
572 char id[4] = {'P','e','e','r'};
574 struct xseg *xseg = peer->xseg;
575 xport portno_start = peer->portno_start;
576 xport portno_end = peer->portno_end;
577 pid_t pid = syscall(SYS_gettid);
578 uint64_t threshold=1000/(1 + portno_end - portno_start);
581 XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
582 xseg_init_local_signal(xseg, peer->portno_start);
583 for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
586 XSEGLOG2(&lc, D, "%s executes function\n", id);
587 xseg_cancel_wait(xseg, peer->portno_start);
594 //Heart of peerd_loop. This loop is common for everyone.
595 for(loops = threshold; loops > 0; loops--) {
597 xseg_prepare_wait(xseg, peer->portno_start);
599 if (check_ports(peer, t))
601 if (check_ports(peer))
611 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
612 xseg_wait_signal(xseg, 10000000UL);
613 xseg_cancel_wait(xseg, peer->portno_start);
614 XSEGLOG2(&lc, I, "%s woke up\n", id);
619 static int init_peerd_loop(struct peerd *peer)
621 struct xseg *xseg = peer->xseg;
623 peer->peerd_loop(peer);
624 custom_peer_finalize(peer);
625 xseg_quit_local_signal(xseg, peer->portno_start);
630 static struct xseg *join(char *spec)
632 struct xseg_config config;
635 (void)xseg_parse_spec(spec, &config);
636 xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
640 (void)xseg_create(&config);
641 return xseg_join(config.type, config.name, PEER_TYPE, NULL);
644 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
645 long portno_end, uint32_t nr_threads, xport defer_portno)
649 struct xseg_port *port;
656 peer = malloc(sizeof(struct peerd));
661 peer->nr_ops = nr_ops;
662 peer->defer_portno = defer_portno;
664 peer->nr_threads = nr_threads;
665 peer->thread = calloc(nr_threads, sizeof(struct thread));
668 if (!xq_alloc_empty(&peer->threads, nr_threads))
670 for (i = 0; i < nr_threads; i++) {
671 peer->thread[i].thread_no = i;
672 if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
675 for (i = 0; i < nr_ops; i++) {
676 __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
680 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
683 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
684 if (!peer->peer_reqs){
689 if (xseg_initialize()){
690 printf("cannot initialize library\n");
693 peer->xseg = join(spec);
697 peer->portno_start = (xport) portno_start;
698 peer->portno_end= (xport) portno_end;
701 * Start binding ports from portno_start to portno_end.
702 * The first port we bind will have its signal_desc initialized by xseg
703 * and the same signal_desc will be used for all the other ports.
705 for (p = peer->portno_start; p <= peer->portno_end; p++) {
706 port = xseg_bind_port(peer->xseg, p, sd);
708 printf("cannot bind to port %u\n", (unsigned int) p);
711 if (p == peer->portno_start)
712 sd = xseg_get_signal_desc(peer->xseg, port);
715 printf("Peer on ports %u-%u\n", peer->portno_start, peer->portno_end);
717 for (i = 0; i < nr_ops; i++) {
718 peer->peer_reqs[i].peer = peer;
719 peer->peer_reqs[i].req = NULL;
720 peer->peer_reqs[i].retval = 0;
721 peer->peer_reqs[i].priv = NULL;
722 peer->peer_reqs[i].portno = NoPort;
724 //Plug default peerd_loop. This can change later on by custom_peer_init.
725 peer->peerd_loop = generic_peerd_loop;
728 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
732 peer->interactive_func = NULL;
738 int pidfile_remove(char *path, int fd)
741 return (unlink(path));
744 int pidfile_write(int pid_fd)
747 snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
750 lseek(pid_fd, 0, SEEK_SET);
751 int ret = write(pid_fd, buf, strlen(buf));
755 int pidfile_read(char *path, pid_t *pid)
757 char buf[16], *endptr;
760 int fd = open(path, O_RDONLY);
763 int ret = read(fd, buf, 15);
769 *pid = strtol(buf, &endptr, 10);
770 if (endptr != &buf[ret]){
778 int pidfile_open(char *path, pid_t *old_pid)
781 int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
784 pidfile_read(path, old_pid);
789 void usage(char *argv0)
791 fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
792 fprintf(stderr, "General peer options:\n"
793 " Option | Default | \n"
794 " --------------------------------------------\n"
795 " -g | None | Segment spec to join\n"
796 " -sp | NoPort | Start portno to bind\n"
797 " -ep | NoPort | End portno to bind\n"
798 " -p | NoPort | Portno to bind\n"
799 " -n | 16 | Number of ops\n"
800 " -v | 0 | Verbosity level\n"
801 " -l | None | Logfile \n"
802 " -d | No | Daemonize \n"
803 " --pidfile | None | Pidfile \n"
805 " -t | No | Number of threads \n"
812 int main(int argc, char *argv[])
814 struct peerd *peer = NULL;
817 long portno_start = -1, portno_end = -1, portno = -1;
820 int daemonize = 0, help = 0;
821 uint32_t nr_ops = 16;
822 uint32_t nr_threads = 1;
823 unsigned int debug_level = 0;
824 xport defer_portno = NoPort;
828 char spec[MAX_SPEC_LEN + 1];
829 char logfile[MAX_LOGFILE_LEN + 1];
830 char pidfile[MAX_PIDFILE_LEN + 1];
836 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
837 // -dp xseg_portno to defer blocking requests
839 //TODO print messages on arg parsing error
840 BEGIN_READ_ARGS(argc, argv);
841 READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
842 READ_ARG_ULONG("-sp", portno_start);
843 READ_ARG_ULONG("-ep", portno_end);
844 READ_ARG_ULONG("-p", portno);
845 READ_ARG_ULONG("-n", nr_ops);
846 READ_ARG_ULONG("-v", debug_level);
848 READ_ARG_ULONG("-t", nr_threads);
850 READ_ARG_ULONG("-dp", defer_portno);
851 READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
852 READ_ARG_BOOL("-d", daemonize);
853 READ_ARG_BOOL("-h", help);
854 READ_ARG_BOOL("--help", help);
855 READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
863 r = init_logctx(&lc, argv[0], debug_level, logfile,
864 REDIRECT_STDOUT|REDIRECT_STDERR);
866 XSEGLOG("Cannot initialize logging to logfile");
869 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
872 pid_fd = pidfile_open(pidfile, &old_pid);
875 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
877 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
884 if (daemon(0, 1) < 0){
885 XSEGLOG2(&lc, E, "Cannot daemonize");
891 pidfile_write(pid_fd);
893 //TODO perform argument sanity checks
894 verbose = debug_level;
896 portno_start = portno;
899 if (portno_start == -1 || portno_end == -1){
900 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
906 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
912 r = custom_peer_init(peer, argc, argv);
917 peerd_start_threads(peer);
918 #elif defined(ST_THREADS)
919 st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
920 r = st_thread_join(st, NULL);
922 r = init_peerd_loop(peer);
926 pidfile_remove(pidfile, pid_fd);