modify logging mechanism to support redirection of std streams and logfile reopen
[archipelago] / xseg / peers / user / peer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <peer.h>
8 #include <sys/syscall.h>
9 #include <sys/time.h>
10 #include <signal.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #ifdef MT
15 #include <pthread.h>
16 #endif
17
18 #ifdef MT
19 #define PEER_TYPE "pthread"
20 #else
21 #define PEER_TYPE "posix"
22 #endif
23
24 volatile unsigned int terminated = 0;
25 unsigned int verbose = 0;
26 struct log_ctx lc;
27 #ifdef ST_THREADS
28 uint32_t ta = 0;
29 #endif
30
31 #ifdef MT
32 struct thread {
33         struct peerd *peer;
34         pthread_t tid;
35         pthread_cond_t cond;
36         pthread_mutex_t lock;
37         void (*func)(void *arg);
38         void *arg;
39 };
40
41
42 inline static struct thread* alloc_thread(struct peerd *peer)
43 {
44         xqindex idx = xq_pop_head(&peer->threads, 1);
45         if (idx == Noneidx)
46                 return NULL;
47         return peer->thread + idx;
48 }
49
50 inline static void free_thread(struct peerd *peer, struct thread *t)
51 {
52         xqindex idx = t - peer->thread;
53         xq_append_head(&peer->threads, idx, 1);
54 }
55
56
57 inline static void __wake_up_thread(struct thread *t)
58 {
59         pthread_mutex_lock(&t->lock);
60         pthread_cond_signal(&t->cond);
61         pthread_mutex_unlock(&t->lock);
62 }
63
64 inline static void wake_up_thread(struct thread* t)
65 {
66         if (t){
67                 __wake_up_thread(t);
68         }
69 }
70
71 inline static int wake_up_next_thread(struct peerd *peer)
72 {
73         return (xseg_signal(peer->xseg, peer->portno_start));
74 }
75 #endif
76
77
78 static inline int isTerminate()
79 {
80 /* ta doesn't need to be taken into account, because the main loops
81  * doesn't check the terminated flag if ta is not 0.
82  *
83 #ifdef ST_THREADS
84         return (!ta & terminated);
85 #else
86         return terminated;
87 #endif
88         */
89         return terminated;
90 }
91
92 void signal_handler(int signal)
93 {
94         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
95         terminated = 1;
96 }
97
98 void renew_logfile(int signal)
99 {
100         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
101         renew_logctx(&lc, NULL, lc.log_level, NULL, REOPEN_FILE);
102 }
103
104 static int setup_signals()
105 {
106         int r;
107         struct sigaction sa;
108         sigemptyset(&sa.sa_mask);
109         sa.sa_flags = 0;
110         sa.sa_handler = signal_handler;
111         r = sigaction(SIGTERM, &sa, NULL);
112         if (r < 0)
113                 return r;
114         r = sigaction(SIGINT, &sa, NULL);
115         if (r < 0)
116                 return r;
117         r = sigaction(SIGQUIT, &sa, NULL);
118         if (r < 0)
119                 return r;
120
121         sa.sa_handler = renew_logfile;
122         r = sigaction(SIGUSR1, &sa, NULL);
123         if (r < 0)
124                 return r;
125
126         return r;
127 }
128
129 inline int canDefer(struct peerd *peer)
130 {
131         return !(peer->defer_portno == NoPort);
132 }
133
134 void print_req(struct xseg *xseg, struct xseg_request *req)
135 {
136         char target[64], data[64];
137         char *req_target, *req_data;
138         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
139         req_target = xseg_get_target(xseg, req);
140         req_data = xseg_get_data(xseg, req);
141
142         if (1) {
143                 strncpy(target, req_target, end);
144                 target[end] = 0;
145                 strncpy(data, req_data, 63);
146                 data[63] = 0;
147                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
148                                 "src: %u, st: %u, dst: %u dt: %u\n"
149                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
150                                 (unsigned long)(req),
151                                 (unsigned int)req->op,
152                                 (unsigned long long)req->offset,
153                                 (unsigned long)req->size,
154                                 (unsigned long)req->serviced,
155                                 (unsigned int)req->state,
156                                 (unsigned int)req->src_portno,
157                                 (unsigned int)req->src_transit_portno,
158                                 (unsigned int)req->dst_portno,
159                                 (unsigned int)req->dst_transit_portno,
160                                 (unsigned int)req->targetlen, target,
161                                 (unsigned long long)req->datalen, data);
162         }
163 }
164 void log_pr(char *msg, struct peer_req *pr)
165 {
166         char target[64], data[64];
167         char *req_target, *req_data;
168         struct peerd *peer = pr->peer;
169         struct xseg *xseg = pr->peer->xseg;
170         req_target = xseg_get_target(xseg, pr->req);
171         req_data = xseg_get_data(xseg, pr->req);
172         /* null terminate name in case of req->target is less than 63 characters,
173          * and next character after name (aka first byte of next buffer) is not
174          * null
175          */
176         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
177         if (verbose) {
178                 strncpy(target, req_target, end);
179                 target[end] = 0;
180                 strncpy(data, req_data, 63);
181                 data[63] = 0;
182                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
183                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
184                                 msg,
185                                 (unsigned int)(pr - peer->peer_reqs),
186                                 (unsigned int)pr->req->op,
187                                 (unsigned long long)pr->req->offset,
188                                 (unsigned long)pr->req->size,
189                                 (unsigned long)pr->req->serviced,
190                                 (unsigned long)pr->retval,
191                                 (unsigned int)pr->req->state,
192                                 (unsigned int)pr->req->targetlen, target,
193                                 (unsigned long long)pr->req->datalen, data);
194         }
195 }
196
197 inline struct peer_req *alloc_peer_req(struct peerd *peer)
198 {
199         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
200         if (idx == Noneidx)
201                 return NULL;
202         return peer->peer_reqs + idx;
203 }
204
205 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
206 {
207         xqindex idx = pr - peer->peer_reqs;
208         pr->req = NULL;
209         xq_append_head(&peer->free_reqs, idx, 1);
210 }
211
212 struct timeval resp_start, resp_end, resp_accum = {0, 0};
213 uint64_t responds = 0;
214 void get_responds_stats(){
215                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
216                                 //(unsigned int)(t - peer->thread),
217                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
218 }
219
220 //FIXME error check
221 void fail(struct peerd *peer, struct peer_req *pr)
222 {
223         struct xseg_request *req = pr->req;
224         uint32_t p;
225         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
226         req->state |= XS_FAILED;
227         //xseg_set_req_data(peer->xseg, pr->req, NULL);
228         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
229         xseg_signal(peer->xseg, p);
230         free_peer_req(peer, pr);
231 #ifdef MT
232         wake_up_next_thread(peer);
233 #endif
234 }
235
236 //FIXME error check
237 void complete(struct peerd *peer, struct peer_req *pr)
238 {
239         struct xseg_request *req = pr->req;
240         uint32_t p;
241         req->state |= XS_SERVED;
242         //xseg_set_req_data(peer->xseg, pr->req, NULL);
243         //gettimeofday(&resp_start, NULL);
244         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
245         //gettimeofday(&resp_end, NULL);
246         //responds++;
247         //timersub(&resp_end, &resp_start, &resp_end);
248         //timeradd(&resp_end, &resp_accum, &resp_accum);
249         //printf("xseg_signal: %u\n", p);
250         xseg_signal(peer->xseg, p);
251         free_peer_req(peer, pr);
252 #ifdef MT
253         wake_up_next_thread(peer);
254 #endif
255 }
256
257 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
258                                 struct xseg_request *req)
259 {
260         struct xseg_request *xreq = pr->req;
261         //assert xreq == req;
262         XSEGLOG2(&lc, D, "Handle accepted");
263         xreq->serviced = 0;
264         //xreq->state = XS_ACCEPTED;
265         pr->retval = 0;
266         dispatch(peer, pr, req, dispatch_accept);
267 }
268
269 static void handle_received(struct peerd *peer, struct peer_req *pr,
270                                 struct xseg_request *req)
271 {
272         //struct xseg_request *req = pr->req;
273         //assert req->state != XS_ACCEPTED;
274         XSEGLOG2(&lc, D, "Handle received \n");
275         dispatch(peer, pr, req, dispatch_receive);
276
277 }
278 struct timeval sub_start, sub_end, sub_accum = {0, 0};
279 uint64_t submits = 0;
280 void get_submits_stats(){
281                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
282                                 //(unsigned int)(t - peer->thread),
283                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
284 }
285
286 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
287 {
288         uint32_t ret;
289         struct xseg_request *req = pr->req;
290         // assert req->portno == peer->portno ?
291         //TODO small function with error checking
292         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
293         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
294         if (ret < 0)
295                 return -1;
296         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
297         //gettimeofday(&sub_start, NULL);
298         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
299         //gettimeofday(&sub_end, NULL);
300         //submits++;
301         //timersub(&sub_end, &sub_start, &sub_end);
302         //timeradd(&sub_end, &sub_accum, &sub_accum);
303         if (ret == NoPort)
304                 return -1;
305         xseg_signal(peer->xseg, ret);
306         return 0;
307 }
308
309 static int check_ports(struct peerd *peer)
310 {
311         struct xseg *xseg = peer->xseg;
312         xport portno_start = peer->portno_start;
313         xport portno_end = peer->portno_end;
314         struct xseg_request *accepted, *received;
315         struct peer_req *pr;
316         xport i;
317         int  r, c = 0;
318
319         for (i = portno_start; i <= portno_end; i++) {
320                 accepted = NULL;
321                 received = NULL;
322                 if (!isTerminate()) {
323                         pr = alloc_peer_req(peer);
324                         if (pr) {
325                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
326                                 if (accepted) {
327                                         pr->req = accepted;
328                                         pr->portno = i;
329                                         xseg_cancel_wait(xseg, i);
330                                         handle_accepted(peer, pr, accepted);
331                                         c = 1;
332                                 }
333                                 else {
334                                         free_peer_req(peer, pr);
335                                 }
336                         }
337                 }
338                 received = xseg_receive(xseg, i, X_NONBLOCK);
339                 if (received) {
340                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
341                         if (r < 0 || !pr){
342                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
343                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
344                                 if (p == NoPort){
345                                         XSEGLOG2(&lc, W, "Could not respond stale request");
346                                         xseg_put_request(xseg, received, portno_start);
347                                         continue;
348                                 } else {
349                                         xseg_signal(xseg, p);
350                                 }
351                         } else {
352                                 //maybe perform sanity check for pr
353                                 xseg_cancel_wait(xseg, i);
354                                 handle_received(peer, pr, received);
355                                 c = 1;
356                         }
357                 }
358         }
359
360         return c;
361 }
362
363 #ifdef MT
364 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
365 {
366         struct thread *t = alloc_thread(peer);
367         if (t) {
368                 t->func = func;
369                 t->arg = arg;
370                 wake_up_thread(t);
371                 return 0;
372         } else
373                 // we could hijack a thread
374                 return -1;
375 }
376
377 static void* thread_loop(void *arg)
378 {
379         struct thread *t = (struct thread *) arg;
380         struct peerd *peer = t->peer;
381         struct xseg *xseg = peer->xseg;
382         xport portno_start = peer->portno_start;
383         xport portno_end = peer->portno_end;
384         pid_t pid =syscall(SYS_gettid);
385         uint64_t loops;
386         uint64_t threshold=1000/(1 + portno_end - portno_start);
387
388         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
389
390         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
391         xseg_init_local_signal(xseg, peer->portno_start);
392         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
393                 if (t->func) {
394                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
395                         xseg_cancel_wait(xseg, peer->portno_start);
396                         t->func(t->arg);
397                         t->func = NULL;
398                         t->arg = NULL;
399                         continue;
400                 }
401
402                 for(loops= threshold; loops > 0; loops--) {
403                         if (loops == 1)
404                                 xseg_prepare_wait(xseg, peer->portno_start);
405                         if (check_ports(peer))
406                                 loops = threshold;
407                 }
408                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
409                 xseg_wait_signal(xseg, 10000000UL);
410                 xseg_cancel_wait(xseg, peer->portno_start);
411                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
412         }
413         wake_up_next_thread(peer);
414         return NULL;
415 }
416
417 int peerd_start_threads(struct peerd *peer)
418 {
419         int i;
420         uint32_t nr_threads = peer->nr_threads;
421         //TODO err check
422         for (i = 0; i < nr_threads; i++) {
423                 peer->thread[i].peer = peer;
424                 pthread_cond_init(&peer->thread[i].cond,NULL);
425                 pthread_mutex_init(&peer->thread[i].lock, NULL);
426                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
427                 peer->thread[i].func = NULL;
428                 peer->thread[i].arg = NULL;
429
430         }
431         return 0;
432 }
433 #endif
434
435 void defer_request(struct peerd *peer, struct peer_req *pr)
436 {
437         // assert canDefer(peer);
438 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
439 //      xseg_signal(peer->xseg, peer->defer_portno);
440 //      free_peer_req(peer, pr);
441 }
442
443 static int peerd_loop(struct peerd *peer) 
444 {
445 #ifdef MT
446         int i;
447         if (peer->interactive_func)
448                 peer->interactive_func();
449         for (i = 0; i < peer->nr_threads; i++) {
450                 pthread_join(peer->thread[i].tid, NULL);
451         }
452 #else
453         struct xseg *xseg = peer->xseg;
454         xport portno_start = peer->portno_start;
455         xport portno_end = peer->portno_end;
456         uint64_t threshold=1000/(1 + portno_end - portno_start);
457         pid_t pid =syscall(SYS_gettid);
458         uint64_t loops;
459
460         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
461         xseg_init_local_signal(xseg, peer->portno_start);
462         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
463                 for(loops= threshold; loops > 0; loops--) {
464                         if (loops == 1)
465                                 xseg_prepare_wait(xseg, peer->portno_start);
466                         if (check_ports(peer))
467                                 loops = threshold;
468                 }
469 #ifdef ST_THREADS
470                 if (ta){
471                         st_sleep(0);
472                 } else {
473 #endif
474                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
475                         xseg_wait_signal(xseg, 10000000UL);
476                         xseg_cancel_wait(xseg, peer->portno_start);
477                         XSEGLOG2(&lc, I, "Peer woke up\n");
478 #ifdef ST_THREADS
479                 }
480 #endif
481         }
482         xseg_quit_local_signal(xseg, peer->portno_start);
483 #endif
484         return 0;
485 }
486
487 static struct xseg *join(char *spec)
488 {
489         struct xseg_config config;
490         struct xseg *xseg;
491
492         (void)xseg_parse_spec(spec, &config);
493         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
494         if (xseg)
495                 return xseg;
496
497         (void)xseg_create(&config);
498         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
499 }
500
501 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
502                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
503 {
504         int i;
505         struct peerd *peer;
506         struct xseg_port *port;
507
508 #ifdef ST_THREADS
509         st_init();
510 #endif
511         peer = malloc(sizeof(struct peerd));
512         if (!peer) {
513                 perror("malloc");
514                 return NULL;
515         }
516         peer->nr_ops = nr_ops;
517         peer->defer_portno = defer_portno;
518 #ifdef MT
519         peer->nr_threads = nr_threads;
520         peer->thread = calloc(nr_threads, sizeof(struct thread));
521         if (!peer->thread)
522                 goto malloc_fail;
523 #endif
524         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
525         if (!peer->peer_reqs){
526 malloc_fail:
527                 perror("malloc");
528                 return NULL;
529         }
530
531         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
532                 goto malloc_fail;
533 #ifdef MT
534         if (!xq_alloc_empty(&peer->threads, nr_threads))
535                 goto malloc_fail;
536 #endif
537         if (xseg_initialize()){
538                 printf("cannot initialize library\n");
539                 return NULL;
540         }
541         peer->xseg = join(spec);
542         if (!peer->xseg) 
543                 return NULL;
544
545         peer->portno_start = (xport) portno_start;
546         peer->portno_end= (xport) portno_end;
547         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
548         if (!port){
549                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
550                 return NULL;
551         }
552
553         xport p;
554         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
555                 struct xseg_port *tmp;
556                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
557                 if (!tmp){
558                         printf("cannot bind to port %u\n", (unsigned int) p);
559                         return NULL;
560                 }
561         }
562
563         printf("Peer on ports  %u-%u\n", peer->portno_start,
564                         peer->portno_end);
565
566         for (i = 0; i < nr_ops; i++) {
567                 peer->peer_reqs[i].peer = peer;
568                 peer->peer_reqs[i].req = NULL;
569                 peer->peer_reqs[i].retval = 0;
570                 peer->peer_reqs[i].priv = NULL;
571                 peer->peer_reqs[i].portno = NoPort;
572 #ifdef ST_THREADS
573                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
574 #endif
575         }
576 #ifdef MT
577         peer->interactive_func = NULL;
578 #endif
579         return peer;
580 }
581
582 int pidfile_remove(char *path, int fd)
583 {
584         close(fd);
585         return (unlink(path));
586 }
587
588 int pidfile_write(int pid_fd)
589 {
590         char buf[16];
591         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
592         buf[15] = 0;
593         
594         lseek(pid_fd, 0, SEEK_SET);
595         int ret = write(pid_fd, buf, strlen(buf));
596         return ret;
597 }
598
599 int pidfile_read(char *path, pid_t *pid)
600 {
601         char buf[16], *endptr;
602         *pid = 0;
603
604         int fd = open(path, O_RDONLY);
605         if (fd < 0)
606                 return -1;
607         int ret = read(fd, buf, 15);
608         buf[15]=0;
609         close(fd);
610         if (ret < 0)
611                 return -1;
612         else{
613                 *pid = strtol(buf, &endptr, 10);
614                 if (endptr != &buf[ret]){
615                         *pid = 0;
616                         return -1;
617                 }
618         }
619         return 0;
620 }
621
622 int pidfile_open(char *path, pid_t *old_pid)
623 {
624         //nfs version > 3
625         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
626         if (fd < 0){
627                 if (errno == -EEXIST)
628                         pidfile_read(path, old_pid);
629         }
630         return fd;
631 }
632
633 int main(int argc, char *argv[])
634 {
635         struct peerd *peer = NULL;
636         //parse args
637         char *spec = "";
638         int i, r, daemonize = 0;
639         long portno_start = -1, portno_end = -1, portno = -1;
640         //set defaults here
641         uint32_t nr_ops = 16;
642         uint32_t nr_threads = 16 ;
643         unsigned int debug_level = 0;
644         uint32_t defer_portno = NoPort;
645         char *logfile = NULL;
646         char *pidfile = NULL;
647         pid_t old_pid;
648         int pid_fd = -1;
649
650         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
651         // -dp xseg_portno to defer blocking requests
652         // -l log file ?
653         //TODO print messages on arg parsing error
654         //TODO string checking 
655
656         for (i = 1; i < argc; i++) {
657                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
658                         spec = argv[i+1];
659                         i += 1;
660                         continue;
661                 }
662
663                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
664                         portno_start = strtoul(argv[i+1], NULL, 10);
665                         i += 1;
666                         continue;
667                 }
668
669                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
670                         portno_end = strtoul(argv[i+1], NULL, 10);
671                         i += 1;
672                         continue;
673                 }
674
675                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
676                         portno = strtoul(argv[i+1], NULL, 10);
677                         i += 1;
678                         continue;
679                 }
680
681                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
682                         nr_ops = strtoul(argv[i+1], NULL, 10);
683                         i += 1;
684                         continue;
685                 }
686                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
687                         debug_level = atoi(argv[i+1]);
688                         i += 1;
689                         continue;
690                 }
691                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
692                         nr_threads = strtoul(argv[i+1], NULL, 10);
693                         i += 1;
694                         continue;
695                 }
696                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
697                         defer_portno = strtoul(argv[i+1], NULL, 10);
698                         i += 1;
699                         continue;
700                 }
701                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
702                         logfile = argv[i+1];
703                         i += 1;
704                         continue;
705                 }
706                 if (!strcmp(argv[i], "-d")) {
707                         daemonize = 1;
708                         continue;
709                 }
710                 if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
711                         pidfile = argv[i+1];
712                         i += 1;
713                         continue;
714                 }
715
716         }
717         r = init_logctx(&lc, argv[0], debug_level, logfile,
718                         REDIRECT_STDOUT|REDIRECT_STDERR);
719         if (r < 0){
720                 XSEGLOG("Cannot initialize logging to logfile");
721                 return -1;
722         }
723         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
724
725         if (pidfile){
726                 pid_fd = pidfile_open(pidfile, &old_pid);
727                 if (pid_fd < 0) {
728                         if (old_pid) {
729                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
730                         } else {
731                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
732                         }
733                         return -1;
734                 }
735         }
736
737         if (daemonize){
738                 if (daemon(0, 1) < 0){
739                         XSEGLOG2(&lc, E, "Cannot daemonize");
740                         r = -1;
741                         goto out;
742                 }
743         }
744
745         pidfile_write(pid_fd);
746
747         //TODO perform argument sanity checks
748         verbose = debug_level;
749         if (portno != -1) {
750                 portno_start = portno;
751                 portno_end = portno;
752         }
753
754         setup_signals();
755         //TODO err check
756         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
757         if (!peer){
758                 r = -1;
759                 goto out;
760         }
761         r = custom_peer_init(peer, argc, argv);
762         if (r < 0)
763                 goto out;
764 #ifdef MT
765         peerd_start_threads(peer);
766 #endif
767
768 #ifdef ST_THREADS
769         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
770         r = st_thread_join(st, NULL);
771 #else
772         r = peerd_loop(peer);
773 #endif
774 out:
775         if (pid_fd > 0)
776                 pidfile_remove(pidfile, pid_fd);
777         return r;
778 }