add prefix to strip in mt-pfiled
[archipelago] / xseg / peers / user / peer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <peer.h>
8 #include <sys/syscall.h>
9 #include <sys/time.h>
10 #include <signal.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #ifdef MT
15 #include <pthread.h>
16 #endif
17
18 #ifdef MT
19 #define PEER_TYPE "pthread"
20 #else
21 #define PEER_TYPE "posix"
22 #endif
23
24 volatile unsigned int terminated = 0;
25 unsigned int verbose = 0;
26 struct log_ctx lc;
27 #ifdef ST_THREADS
28 uint32_t ta = 0;
29 #endif
30
31 #ifdef MT
32 struct peerd *global_peer;
33
34 struct thread {
35         struct peerd *peer;
36         pthread_t tid;
37         pthread_cond_t cond;
38         pthread_mutex_t lock;
39         void (*func)(void *arg);
40         void *arg;
41 };
42
43
44 inline static struct thread* alloc_thread(struct peerd *peer)
45 {
46         xqindex idx = xq_pop_head(&peer->threads, 1);
47         if (idx == Noneidx)
48                 return NULL;
49         return peer->thread + idx;
50 }
51
52 inline static void free_thread(struct peerd *peer, struct thread *t)
53 {
54         xqindex idx = t - peer->thread;
55         xq_append_head(&peer->threads, idx, 1);
56 }
57
58
59 inline static void __wake_up_thread(struct thread *t)
60 {
61         pthread_mutex_lock(&t->lock);
62         pthread_cond_signal(&t->cond);
63         pthread_mutex_unlock(&t->lock);
64 }
65
66 inline static void wake_up_thread(struct thread* t)
67 {
68         if (t){
69                 __wake_up_thread(t);
70         }
71 }
72
73 inline static int wake_up_next_thread(struct peerd *peer)
74 {
75         return (xseg_signal(peer->xseg, peer->portno_start));
76 }
77 #endif
78
79
80 static inline int isTerminate()
81 {
82 /* ta doesn't need to be taken into account, because the main loops
83  * doesn't check the terminated flag if ta is not 0.
84  */
85         /*
86 #ifdef ST_THREADS
87         return (!ta & terminated);
88 #else
89         return terminated;
90 #endif
91         */
92         return terminated;
93 }
94
95 void signal_handler(int signal)
96 {
97         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
98         terminated = 1;
99 #ifdef MT
100         wake_up_next_thread(global_peer);
101 #endif
102 }
103
104 void renew_logfile(int signal)
105 {
106         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
107         renew_logctx(&lc, NULL, lc.log_level, NULL, REOPEN_FILE);
108 }
109
110 static int setup_signals(struct peerd *peer)
111 {
112         int r;
113         struct sigaction sa;
114 #ifdef MT
115         global_peer = peer;
116 #endif
117         sigemptyset(&sa.sa_mask);
118         sa.sa_flags = 0;
119         sa.sa_handler = signal_handler;
120         r = sigaction(SIGTERM, &sa, NULL);
121         if (r < 0)
122                 return r;
123         r = sigaction(SIGINT, &sa, NULL);
124         if (r < 0)
125                 return r;
126         r = sigaction(SIGQUIT, &sa, NULL);
127         if (r < 0)
128                 return r;
129
130         sa.sa_handler = renew_logfile;
131         r = sigaction(SIGUSR1, &sa, NULL);
132         if (r < 0)
133                 return r;
134
135         return r;
136 }
137
138 inline int canDefer(struct peerd *peer)
139 {
140         return !(peer->defer_portno == NoPort);
141 }
142
143 void print_req(struct xseg *xseg, struct xseg_request *req)
144 {
145         char target[64], data[64];
146         char *req_target, *req_data;
147         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
148         req_target = xseg_get_target(xseg, req);
149         req_data = xseg_get_data(xseg, req);
150
151         if (1) {
152                 strncpy(target, req_target, end);
153                 target[end] = 0;
154                 strncpy(data, req_data, 63);
155                 data[63] = 0;
156                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
157                                 "src: %u, st: %u, dst: %u dt: %u\n"
158                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
159                                 (unsigned long)(req),
160                                 (unsigned int)req->op,
161                                 (unsigned long long)req->offset,
162                                 (unsigned long)req->size,
163                                 (unsigned long)req->serviced,
164                                 (unsigned int)req->state,
165                                 (unsigned int)req->src_portno,
166                                 (unsigned int)req->src_transit_portno,
167                                 (unsigned int)req->dst_portno,
168                                 (unsigned int)req->dst_transit_portno,
169                                 (unsigned int)req->targetlen, target,
170                                 (unsigned long long)req->datalen, data);
171         }
172 }
173 void log_pr(char *msg, struct peer_req *pr)
174 {
175         char target[64], data[64];
176         char *req_target, *req_data;
177         struct peerd *peer = pr->peer;
178         struct xseg *xseg = pr->peer->xseg;
179         req_target = xseg_get_target(xseg, pr->req);
180         req_data = xseg_get_data(xseg, pr->req);
181         /* null terminate name in case of req->target is less than 63 characters,
182          * and next character after name (aka first byte of next buffer) is not
183          * null
184          */
185         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
186         if (verbose) {
187                 strncpy(target, req_target, end);
188                 target[end] = 0;
189                 strncpy(data, req_data, 63);
190                 data[63] = 0;
191                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
192                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
193                                 msg,
194                                 (unsigned int)(pr - peer->peer_reqs),
195                                 (unsigned int)pr->req->op,
196                                 (unsigned long long)pr->req->offset,
197                                 (unsigned long)pr->req->size,
198                                 (unsigned long)pr->req->serviced,
199                                 (unsigned long)pr->retval,
200                                 (unsigned int)pr->req->state,
201                                 (unsigned int)pr->req->targetlen, target,
202                                 (unsigned long long)pr->req->datalen, data);
203         }
204 }
205
206 inline struct peer_req *alloc_peer_req(struct peerd *peer)
207 {
208         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
209         if (idx == Noneidx)
210                 return NULL;
211         return peer->peer_reqs + idx;
212 }
213
214 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
215 {
216         xqindex idx = pr - peer->peer_reqs;
217         pr->req = NULL;
218         xq_append_head(&peer->free_reqs, idx, 1);
219 }
220
221 struct timeval resp_start, resp_end, resp_accum = {0, 0};
222 uint64_t responds = 0;
223 void get_responds_stats(){
224                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
225                                 //(unsigned int)(t - peer->thread),
226                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
227 }
228
229 //FIXME error check
230 void fail(struct peerd *peer, struct peer_req *pr)
231 {
232         struct xseg_request *req = pr->req;
233         uint32_t p;
234         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
235         req->state |= XS_FAILED;
236         //xseg_set_req_data(peer->xseg, pr->req, NULL);
237         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
238         xseg_signal(peer->xseg, p);
239         free_peer_req(peer, pr);
240 #ifdef MT
241         wake_up_next_thread(peer);
242 #endif
243 }
244
245 //FIXME error check
246 void complete(struct peerd *peer, struct peer_req *pr)
247 {
248         struct xseg_request *req = pr->req;
249         uint32_t p;
250         req->state |= XS_SERVED;
251         //xseg_set_req_data(peer->xseg, pr->req, NULL);
252         //gettimeofday(&resp_start, NULL);
253         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
254         //gettimeofday(&resp_end, NULL);
255         //responds++;
256         //timersub(&resp_end, &resp_start, &resp_end);
257         //timeradd(&resp_end, &resp_accum, &resp_accum);
258         //printf("xseg_signal: %u\n", p);
259         xseg_signal(peer->xseg, p);
260         free_peer_req(peer, pr);
261 #ifdef MT
262         wake_up_next_thread(peer);
263 #endif
264 }
265
266 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
267                                 struct xseg_request *req)
268 {
269         struct xseg_request *xreq = pr->req;
270         //assert xreq == req;
271         XSEGLOG2(&lc, D, "Handle accepted");
272         xreq->serviced = 0;
273         //xreq->state = XS_ACCEPTED;
274         pr->retval = 0;
275         dispatch(peer, pr, req, dispatch_accept);
276 }
277
278 static void handle_received(struct peerd *peer, struct peer_req *pr,
279                                 struct xseg_request *req)
280 {
281         //struct xseg_request *req = pr->req;
282         //assert req->state != XS_ACCEPTED;
283         XSEGLOG2(&lc, D, "Handle received \n");
284         dispatch(peer, pr, req, dispatch_receive);
285
286 }
287 struct timeval sub_start, sub_end, sub_accum = {0, 0};
288 uint64_t submits = 0;
289 void get_submits_stats(){
290                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
291                                 //(unsigned int)(t - peer->thread),
292                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
293 }
294
295 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
296 {
297         uint32_t ret;
298         struct xseg_request *req = pr->req;
299         // assert req->portno == peer->portno ?
300         //TODO small function with error checking
301         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
302         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
303         if (ret < 0)
304                 return -1;
305         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
306         //gettimeofday(&sub_start, NULL);
307         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
308         //gettimeofday(&sub_end, NULL);
309         //submits++;
310         //timersub(&sub_end, &sub_start, &sub_end);
311         //timeradd(&sub_end, &sub_accum, &sub_accum);
312         if (ret == NoPort)
313                 return -1;
314         xseg_signal(peer->xseg, ret);
315         return 0;
316 }
317
318 static int check_ports(struct peerd *peer)
319 {
320         struct xseg *xseg = peer->xseg;
321         xport portno_start = peer->portno_start;
322         xport portno_end = peer->portno_end;
323         struct xseg_request *accepted, *received;
324         struct peer_req *pr;
325         xport i;
326         int  r, c = 0;
327
328         for (i = portno_start; i <= portno_end; i++) {
329                 accepted = NULL;
330                 received = NULL;
331                 if (!isTerminate()) {
332                         pr = alloc_peer_req(peer);
333                         if (pr) {
334                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
335                                 if (accepted) {
336                                         pr->req = accepted;
337                                         pr->portno = i;
338                                         xseg_cancel_wait(xseg, i);
339                                         handle_accepted(peer, pr, accepted);
340                                         c = 1;
341                                 }
342                                 else {
343                                         free_peer_req(peer, pr);
344                                 }
345                         }
346                 }
347                 received = xseg_receive(xseg, i, X_NONBLOCK);
348                 if (received) {
349                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
350                         if (r < 0 || !pr){
351                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
352                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
353                                 if (p == NoPort){
354                                         XSEGLOG2(&lc, W, "Could not respond stale request");
355                                         xseg_put_request(xseg, received, portno_start);
356                                         continue;
357                                 } else {
358                                         xseg_signal(xseg, p);
359                                 }
360                         } else {
361                                 //maybe perform sanity check for pr
362                                 xseg_cancel_wait(xseg, i);
363                                 handle_received(peer, pr, received);
364                                 c = 1;
365                         }
366                 }
367         }
368
369         return c;
370 }
371
372 #ifdef MT
373 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
374 {
375         struct thread *t = alloc_thread(peer);
376         if (t) {
377                 t->func = func;
378                 t->arg = arg;
379                 wake_up_thread(t);
380                 return 0;
381         } else
382                 // we could hijack a thread
383                 return -1;
384 }
385
386 static void* thread_loop(void *arg)
387 {
388         struct thread *t = (struct thread *) arg;
389         struct peerd *peer = t->peer;
390         struct xseg *xseg = peer->xseg;
391         xport portno_start = peer->portno_start;
392         xport portno_end = peer->portno_end;
393         pid_t pid =syscall(SYS_gettid);
394         uint64_t loops;
395         uint64_t threshold=1000/(1 + portno_end - portno_start);
396
397         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
398
399         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
400         xseg_init_local_signal(xseg, peer->portno_start);
401         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
402                 if (t->func) {
403                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
404                         xseg_cancel_wait(xseg, peer->portno_start);
405                         t->func(t->arg);
406                         t->func = NULL;
407                         t->arg = NULL;
408                         continue;
409                 }
410
411                 for(loops= threshold; loops > 0; loops--) {
412                         if (loops == 1)
413                                 xseg_prepare_wait(xseg, peer->portno_start);
414                         if (check_ports(peer))
415                                 loops = threshold;
416                 }
417                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
418                 xseg_wait_signal(xseg, 10000000UL);
419                 xseg_cancel_wait(xseg, peer->portno_start);
420                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
421         }
422         wake_up_next_thread(peer);
423         return NULL;
424 }
425
426 int peerd_start_threads(struct peerd *peer)
427 {
428         int i;
429         uint32_t nr_threads = peer->nr_threads;
430         //TODO err check
431         for (i = 0; i < nr_threads; i++) {
432                 peer->thread[i].peer = peer;
433                 pthread_cond_init(&peer->thread[i].cond,NULL);
434                 pthread_mutex_init(&peer->thread[i].lock, NULL);
435                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
436                 peer->thread[i].func = NULL;
437                 peer->thread[i].arg = NULL;
438
439         }
440         return 0;
441 }
442 #endif
443
444 void defer_request(struct peerd *peer, struct peer_req *pr)
445 {
446         // assert canDefer(peer);
447 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
448 //      xseg_signal(peer->xseg, peer->defer_portno);
449 //      free_peer_req(peer, pr);
450 }
451
452 static int peerd_loop(struct peerd *peer) 
453 {
454 #ifdef MT
455         int i;
456         if (peer->interactive_func)
457                 peer->interactive_func();
458         for (i = 0; i < peer->nr_threads; i++) {
459                 pthread_join(peer->thread[i].tid, NULL);
460         }
461 #else
462         struct xseg *xseg = peer->xseg;
463         xport portno_start = peer->portno_start;
464         xport portno_end = peer->portno_end;
465         uint64_t threshold=1000/(1 + portno_end - portno_start);
466         pid_t pid =syscall(SYS_gettid);
467         uint64_t loops;
468
469         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
470         xseg_init_local_signal(xseg, peer->portno_start);
471         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
472                 for(loops= threshold; loops > 0; loops--) {
473                         if (loops == 1)
474                                 xseg_prepare_wait(xseg, peer->portno_start);
475                         if (check_ports(peer))
476                                 loops = threshold;
477                 }
478 #ifdef ST_THREADS
479                 if (ta){
480                         st_sleep(0);
481                 } else {
482 #endif
483                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
484                         xseg_wait_signal(xseg, 10000000UL);
485                         xseg_cancel_wait(xseg, peer->portno_start);
486                         XSEGLOG2(&lc, I, "Peer woke up\n");
487 #ifdef ST_THREADS
488                 }
489 #endif
490         }
491         xseg_quit_local_signal(xseg, peer->portno_start);
492 #endif
493         return 0;
494 }
495
496 static struct xseg *join(char *spec)
497 {
498         struct xseg_config config;
499         struct xseg *xseg;
500
501         (void)xseg_parse_spec(spec, &config);
502         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
503         if (xseg)
504                 return xseg;
505
506         (void)xseg_create(&config);
507         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
508 }
509
510 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
511                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
512 {
513         int i;
514         struct peerd *peer;
515         struct xseg_port *port;
516
517 #ifdef ST_THREADS
518         st_init();
519 #endif
520         peer = malloc(sizeof(struct peerd));
521         if (!peer) {
522                 perror("malloc");
523                 return NULL;
524         }
525         peer->nr_ops = nr_ops;
526         peer->defer_portno = defer_portno;
527 #ifdef MT
528         peer->nr_threads = nr_threads;
529         peer->thread = calloc(nr_threads, sizeof(struct thread));
530         if (!peer->thread)
531                 goto malloc_fail;
532 #endif
533         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
534         if (!peer->peer_reqs){
535 malloc_fail:
536                 perror("malloc");
537                 return NULL;
538         }
539
540         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
541                 goto malloc_fail;
542 #ifdef MT
543         if (!xq_alloc_empty(&peer->threads, nr_threads))
544                 goto malloc_fail;
545 #endif
546         if (xseg_initialize()){
547                 printf("cannot initialize library\n");
548                 return NULL;
549         }
550         peer->xseg = join(spec);
551         if (!peer->xseg) 
552                 return NULL;
553
554         peer->portno_start = (xport) portno_start;
555         peer->portno_end= (xport) portno_end;
556         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
557         if (!port){
558                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
559                 return NULL;
560         }
561
562         xport p;
563         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
564                 struct xseg_port *tmp;
565                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
566                 if (!tmp){
567                         printf("cannot bind to port %u\n", (unsigned int) p);
568                         return NULL;
569                 }
570         }
571
572         printf("Peer on ports  %u-%u\n", peer->portno_start,
573                         peer->portno_end);
574
575         for (i = 0; i < nr_ops; i++) {
576                 peer->peer_reqs[i].peer = peer;
577                 peer->peer_reqs[i].req = NULL;
578                 peer->peer_reqs[i].retval = 0;
579                 peer->peer_reqs[i].priv = NULL;
580                 peer->peer_reqs[i].portno = NoPort;
581 #ifdef ST_THREADS
582                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
583 #endif
584         }
585 #ifdef MT
586         peer->interactive_func = NULL;
587 #endif
588         return peer;
589 }
590
591 int pidfile_remove(char *path, int fd)
592 {
593         close(fd);
594         return (unlink(path));
595 }
596
597 int pidfile_write(int pid_fd)
598 {
599         char buf[16];
600         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
601         buf[15] = 0;
602         
603         lseek(pid_fd, 0, SEEK_SET);
604         int ret = write(pid_fd, buf, strlen(buf));
605         return ret;
606 }
607
608 int pidfile_read(char *path, pid_t *pid)
609 {
610         char buf[16], *endptr;
611         *pid = 0;
612
613         int fd = open(path, O_RDONLY);
614         if (fd < 0)
615                 return -1;
616         int ret = read(fd, buf, 15);
617         buf[15]=0;
618         close(fd);
619         if (ret < 0)
620                 return -1;
621         else{
622                 *pid = strtol(buf, &endptr, 10);
623                 if (endptr != &buf[ret]){
624                         *pid = 0;
625                         return -1;
626                 }
627         }
628         return 0;
629 }
630
631 int pidfile_open(char *path, pid_t *old_pid)
632 {
633         //nfs version > 3
634         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
635         if (fd < 0){
636                 if (errno == -EEXIST)
637                         pidfile_read(path, old_pid);
638         }
639         return fd;
640 }
641
642 int main(int argc, char *argv[])
643 {
644         struct peerd *peer = NULL;
645         //parse args
646         char *spec = "";
647         int i, r, daemonize = 0;
648         long portno_start = -1, portno_end = -1, portno = -1;
649         //set defaults here
650         uint32_t nr_ops = 16;
651         uint32_t nr_threads = 16 ;
652         unsigned int debug_level = 0;
653         uint32_t defer_portno = NoPort;
654         char *logfile = NULL;
655         char *pidfile = NULL;
656         pid_t old_pid;
657         int pid_fd = -1;
658
659         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
660         // -dp xseg_portno to defer blocking requests
661         // -l log file ?
662         //TODO print messages on arg parsing error
663         //TODO string checking
664
665         for (i = 1; i < argc; i++) {
666                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
667                         spec = argv[i+1];
668                         i += 1;
669                         continue;
670                 }
671
672                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
673                         portno_start = strtoul(argv[i+1], NULL, 10);
674                         i += 1;
675                         continue;
676                 }
677
678                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
679                         portno_end = strtoul(argv[i+1], NULL, 10);
680                         i += 1;
681                         continue;
682                 }
683
684                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
685                         portno = strtoul(argv[i+1], NULL, 10);
686                         i += 1;
687                         continue;
688                 }
689
690                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
691                         nr_ops = strtoul(argv[i+1], NULL, 10);
692                         i += 1;
693                         continue;
694                 }
695                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
696                         debug_level = atoi(argv[i+1]);
697                         i += 1;
698                         continue;
699                 }
700                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
701                         nr_threads = strtoul(argv[i+1], NULL, 10);
702                         i += 1;
703                         continue;
704                 }
705                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
706                         defer_portno = strtoul(argv[i+1], NULL, 10);
707                         i += 1;
708                         continue;
709                 }
710                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
711                         logfile = argv[i+1];
712                         i += 1;
713                         continue;
714                 }
715                 if (!strcmp(argv[i], "-d")) {
716                         daemonize = 1;
717                         continue;
718                 }
719                 if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
720                         pidfile = argv[i+1];
721                         i += 1;
722                         continue;
723                 }
724
725         }
726         r = init_logctx(&lc, argv[0], debug_level, logfile,
727                         REDIRECT_STDOUT|REDIRECT_STDERR);
728         if (r < 0){
729                 XSEGLOG("Cannot initialize logging to logfile");
730                 return -1;
731         }
732         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
733
734         if (pidfile){
735                 pid_fd = pidfile_open(pidfile, &old_pid);
736                 if (pid_fd < 0) {
737                         if (old_pid) {
738                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
739                         } else {
740                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
741                         }
742                         return -1;
743                 }
744         }
745
746         if (daemonize){
747                 if (daemon(0, 1) < 0){
748                         XSEGLOG2(&lc, E, "Cannot daemonize");
749                         r = -1;
750                         goto out;
751                 }
752         }
753
754         pidfile_write(pid_fd);
755
756         //TODO perform argument sanity checks
757         verbose = debug_level;
758         if (portno != -1) {
759                 portno_start = portno;
760                 portno_end = portno;
761         }
762
763         //TODO err check
764         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
765         if (!peer){
766                 r = -1;
767                 goto out;
768         }
769         setup_signals(peer);
770         r = custom_peer_init(peer, argc, argv);
771         if (r < 0)
772                 goto out;
773 #ifdef MT
774         peerd_start_threads(peer);
775 #endif
776
777 #ifdef ST_THREADS
778         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
779         r = st_thread_join(st, NULL);
780 #else
781         r = peerd_loop(peer);
782 #endif
783 out:
784         if (pid_fd > 0)
785                 pidfile_remove(pidfile, pid_fd);
786         return r;
787 }