add obj handlers output in xseg reportall
[archipelago] / xseg / peers / user / peer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <peer.h>
8 #include <sys/syscall.h>
9 #include <sys/time.h>
10 #include <signal.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #ifdef MT
15 #include <pthread.h>
16 #endif
17
18 #ifdef MT
19 #define PEER_TYPE "pthread"
20 #else
21 #define PEER_TYPE "posix"
22 #endif
23
24 volatile unsigned int terminated = 0;
25 unsigned int verbose = 0;
26 struct log_ctx lc;
27 #ifdef ST_THREADS
28 uint32_t ta = 0;
29 #endif
30
31 #ifdef MT
32 struct thread {
33         struct peerd *peer;
34         pthread_t tid;
35         pthread_cond_t cond;
36         pthread_mutex_t lock;
37         void (*func)(void *arg);
38         void *arg;
39 };
40
41
42 inline static struct thread* alloc_thread(struct peerd *peer)
43 {
44         xqindex idx = xq_pop_head(&peer->threads, 1);
45         if (idx == Noneidx)
46                 return NULL;
47         return peer->thread + idx;
48 }
49
50 inline static void free_thread(struct peerd *peer, struct thread *t)
51 {
52         xqindex idx = t - peer->thread;
53         xq_append_head(&peer->threads, idx, 1);
54 }
55
56
57 inline static void __wake_up_thread(struct thread *t)
58 {
59         pthread_mutex_lock(&t->lock);
60         pthread_cond_signal(&t->cond);
61         pthread_mutex_unlock(&t->lock);
62 }
63
64 inline static void wake_up_thread(struct thread* t)
65 {
66         if (t){
67                 __wake_up_thread(t);
68         }
69 }
70
71 inline static int wake_up_next_thread(struct peerd *peer)
72 {
73         return (xseg_signal(peer->xseg, peer->portno_start));
74 }
75 #endif
76
77
78 static inline int isTerminate()
79 {
80 /* ta doesn't need to be taken into account, because the main loops
81  * doesn't check the terminated flag if ta is not 0.
82  *
83 #ifdef ST_THREADS
84         return (!ta & terminated);
85 #else
86         return terminated;
87 #endif
88         */
89         return terminated;
90 }
91
92 void signal_handler(int signal)
93 {
94         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
95         terminated = 1;
96 }
97
98 static int setup_signals()
99 {
100         int r;
101         struct sigaction sa;
102         sigemptyset(&sa.sa_mask);
103         sa.sa_flags = 0;
104         sa.sa_handler = signal_handler;
105         r = sigaction(SIGTERM, &sa, NULL);
106         if (r < 0)
107                 return r;
108         r = sigaction(SIGINT, &sa, NULL);
109         if (r < 0)
110                 return r;
111         r = sigaction(SIGQUIT, &sa, NULL);
112         if (r < 0)
113                 return r;
114         return r;
115 }
116
117 inline int canDefer(struct peerd *peer)
118 {
119         return !(peer->defer_portno == NoPort);
120 }
121
122 void print_req(struct xseg *xseg, struct xseg_request *req)
123 {
124         char target[64], data[64];
125         char *req_target, *req_data;
126         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
127         req_target = xseg_get_target(xseg, req);
128         req_data = xseg_get_data(xseg, req);
129
130         if (1) {
131                 strncpy(target, req_target, end);
132                 target[end] = 0;
133                 strncpy(data, req_data, 63);
134                 data[63] = 0;
135                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
136                                 "src: %u, st: %u, dst: %u dt: %u\n"
137                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
138                                 (unsigned long)(req),
139                                 (unsigned int)req->op,
140                                 (unsigned long long)req->offset,
141                                 (unsigned long)req->size,
142                                 (unsigned long)req->serviced,
143                                 (unsigned int)req->state,
144                                 (unsigned int)req->src_portno,
145                                 (unsigned int)req->src_transit_portno,
146                                 (unsigned int)req->dst_portno,
147                                 (unsigned int)req->dst_transit_portno,
148                                 (unsigned int)req->targetlen, target,
149                                 (unsigned long long)req->datalen, data);
150         }
151 }
152 void log_pr(char *msg, struct peer_req *pr)
153 {
154         char target[64], data[64];
155         char *req_target, *req_data;
156         struct peerd *peer = pr->peer;
157         struct xseg *xseg = pr->peer->xseg;
158         req_target = xseg_get_target(xseg, pr->req);
159         req_data = xseg_get_data(xseg, pr->req);
160         /* null terminate name in case of req->target is less than 63 characters,
161          * and next character after name (aka first byte of next buffer) is not
162          * null
163          */
164         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
165         if (verbose) {
166                 strncpy(target, req_target, end);
167                 target[end] = 0;
168                 strncpy(data, req_data, 63);
169                 data[63] = 0;
170                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
171                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
172                                 msg,
173                                 (unsigned int)(pr - peer->peer_reqs),
174                                 (unsigned int)pr->req->op,
175                                 (unsigned long long)pr->req->offset,
176                                 (unsigned long)pr->req->size,
177                                 (unsigned long)pr->req->serviced,
178                                 (unsigned long)pr->retval,
179                                 (unsigned int)pr->req->state,
180                                 (unsigned int)pr->req->targetlen, target,
181                                 (unsigned long long)pr->req->datalen, data);
182         }
183 }
184
185 inline struct peer_req *alloc_peer_req(struct peerd *peer)
186 {
187         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
188         if (idx == Noneidx)
189                 return NULL;
190         return peer->peer_reqs + idx;
191 }
192
193 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
194 {
195         xqindex idx = pr - peer->peer_reqs;
196         pr->req = NULL;
197         xq_append_head(&peer->free_reqs, idx, 1);
198 }
199
200 struct timeval resp_start, resp_end, resp_accum = {0, 0};
201 uint64_t responds = 0;
202 void get_responds_stats(){
203                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
204                                 //(unsigned int)(t - peer->thread),
205                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
206 }
207
208 //FIXME error check
209 void fail(struct peerd *peer, struct peer_req *pr)
210 {
211         struct xseg_request *req = pr->req;
212         uint32_t p;
213         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
214         req->state |= XS_FAILED;
215         //xseg_set_req_data(peer->xseg, pr->req, NULL);
216         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
217         xseg_signal(peer->xseg, p);
218         free_peer_req(peer, pr);
219 #ifdef MT
220         wake_up_next_thread(peer);
221 #endif
222 }
223
224 //FIXME error check
225 void complete(struct peerd *peer, struct peer_req *pr)
226 {
227         struct xseg_request *req = pr->req;
228         uint32_t p;
229         req->state |= XS_SERVED;
230         //xseg_set_req_data(peer->xseg, pr->req, NULL);
231         //gettimeofday(&resp_start, NULL);
232         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
233         //gettimeofday(&resp_end, NULL);
234         //responds++;
235         //timersub(&resp_end, &resp_start, &resp_end);
236         //timeradd(&resp_end, &resp_accum, &resp_accum);
237         //printf("xseg_signal: %u\n", p);
238         xseg_signal(peer->xseg, p);
239         free_peer_req(peer, pr);
240 #ifdef MT
241         wake_up_next_thread(peer);
242 #endif
243 }
244
245 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
246                                 struct xseg_request *req)
247 {
248         struct xseg_request *xreq = pr->req;
249         //assert xreq == req;
250         XSEGLOG2(&lc, D, "Handle accepted");
251         xreq->serviced = 0;
252         //xreq->state = XS_ACCEPTED;
253         pr->retval = 0;
254         dispatch(peer, pr, req, dispatch_accept);
255 }
256
257 static void handle_received(struct peerd *peer, struct peer_req *pr,
258                                 struct xseg_request *req)
259 {
260         //struct xseg_request *req = pr->req;
261         //assert req->state != XS_ACCEPTED;
262         XSEGLOG2(&lc, D, "Handle received \n");
263         dispatch(peer, pr, req, dispatch_receive);
264
265 }
266 struct timeval sub_start, sub_end, sub_accum = {0, 0};
267 uint64_t submits = 0;
268 void get_submits_stats(){
269                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
270                                 //(unsigned int)(t - peer->thread),
271                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
272 }
273
274 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
275 {
276         uint32_t ret;
277         struct xseg_request *req = pr->req;
278         // assert req->portno == peer->portno ?
279         //TODO small function with error checking
280         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
281         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
282         if (ret < 0)
283                 return -1;
284         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
285         //gettimeofday(&sub_start, NULL);
286         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
287         //gettimeofday(&sub_end, NULL);
288         //submits++;
289         //timersub(&sub_end, &sub_start, &sub_end);
290         //timeradd(&sub_end, &sub_accum, &sub_accum);
291         if (ret == NoPort)
292                 return -1;
293         xseg_signal(peer->xseg, ret);
294         return 0;
295 }
296
297 static int check_ports(struct peerd *peer)
298 {
299         struct xseg *xseg = peer->xseg;
300         xport portno_start = peer->portno_start;
301         xport portno_end = peer->portno_end;
302         struct xseg_request *accepted, *received;
303         struct peer_req *pr;
304         xport i;
305         int  r, c = 0;
306
307         for (i = portno_start; i <= portno_end; i++) {
308                 accepted = NULL;
309                 received = NULL;
310                 if (!isTerminate()) {
311                         pr = alloc_peer_req(peer);
312                         if (pr) {
313                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
314                                 if (accepted) {
315                                         pr->req = accepted;
316                                         pr->portno = i;
317                                         xseg_cancel_wait(xseg, i);
318                                         handle_accepted(peer, pr, accepted);
319                                         c = 1;
320                                 }
321                                 else {
322                                         free_peer_req(peer, pr);
323                                 }
324                         }
325                 }
326                 received = xseg_receive(xseg, i, X_NONBLOCK);
327                 if (received) {
328                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
329                         if (r < 0 || !pr){
330                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
331                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
332                                 if (p == NoPort){
333                                         XSEGLOG2(&lc, W, "Could not respond stale request");
334                                         xseg_put_request(xseg, received, portno_start);
335                                         continue;
336                                 } else {
337                                         xseg_signal(xseg, p);
338                                 }
339                         } else {
340                                 //maybe perform sanity check for pr
341                                 xseg_cancel_wait(xseg, i);
342                                 handle_received(peer, pr, received);
343                                 c = 1;
344                         }
345                 }
346         }
347
348         return c;
349 }
350
351 #ifdef MT
352 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
353 {
354         struct thread *t = alloc_thread(peer);
355         if (t) {
356                 t->func = func;
357                 t->arg = arg;
358                 wake_up_thread(t);
359                 return 0;
360         } else
361                 // we could hijack a thread
362                 return -1;
363 }
364
365 static void* thread_loop(void *arg)
366 {
367         struct thread *t = (struct thread *) arg;
368         struct peerd *peer = t->peer;
369         struct xseg *xseg = peer->xseg;
370         xport portno_start = peer->portno_start;
371         xport portno_end = peer->portno_end;
372         pid_t pid =syscall(SYS_gettid);
373         uint64_t loops;
374         uint64_t threshold=1000/(1 + portno_end - portno_start);
375         
376         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
377
378         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
379         xseg_init_local_signal(xseg, peer->portno_start);
380         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
381                 if (t->func) {
382                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
383                         xseg_cancel_wait(xseg, peer->portno_start);
384                         t->func(t->arg);
385                         t->func = NULL;
386                         t->arg = NULL;
387                         continue;
388                 }
389
390                 for(loops= threshold; loops > 0; loops--) {
391                         if (loops == 1)
392                                 xseg_prepare_wait(xseg, peer->portno_start);
393                         if (check_ports(peer))
394                                 loops = threshold;
395                 }
396                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
397                 xseg_wait_signal(xseg, 10000000UL);
398                 xseg_cancel_wait(xseg, peer->portno_start);
399                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
400         }
401         wake_up_next_thread(peer);
402         return NULL;
403 }
404
405 int peerd_start_threads(struct peerd *peer)
406 {
407         int i;
408         uint32_t nr_threads = peer->nr_threads;
409         //TODO err check
410         for (i = 0; i < nr_threads; i++) {
411                 peer->thread[i].peer = peer;
412                 pthread_cond_init(&peer->thread[i].cond,NULL);
413                 pthread_mutex_init(&peer->thread[i].lock, NULL);
414                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
415                 peer->thread[i].func = NULL;
416                 peer->thread[i].arg = NULL;
417
418         }
419         return 0;
420 }
421 #endif
422
423 void defer_request(struct peerd *peer, struct peer_req *pr)
424 {
425         // assert canDefer(peer);
426 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
427 //      xseg_signal(peer->xseg, peer->defer_portno);
428 //      free_peer_req(peer, pr);
429 }
430
431 static int peerd_loop(struct peerd *peer) 
432 {
433 #ifdef MT
434         int i;
435         if (peer->interactive_func)
436                 peer->interactive_func();
437         for (i = 0; i < peer->nr_threads; i++) {
438                 pthread_join(peer->thread[i].tid, NULL);
439         }
440 #else
441         struct xseg *xseg = peer->xseg;
442         xport portno_start = peer->portno_start;
443         xport portno_end = peer->portno_end;
444         uint64_t threshold=1000/(1 + portno_end - portno_start);
445         pid_t pid =syscall(SYS_gettid);
446         uint64_t loops;
447         
448         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
449         xseg_init_local_signal(xseg, peer->portno_start);
450         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
451                 for(loops= threshold; loops > 0; loops--) {
452                         if (loops == 1)
453                                 xseg_prepare_wait(xseg, peer->portno_start);
454                         if (check_ports(peer))
455                                 loops = threshold;
456                 }
457 #ifdef ST_THREADS
458                 if (ta){
459                         st_sleep(0);
460                 } else {
461 #endif
462                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
463                         xseg_wait_signal(xseg, 10000000UL);
464                         xseg_cancel_wait(xseg, peer->portno_start);
465                         XSEGLOG2(&lc, I, "Peer woke up\n");
466 #ifdef ST_THREADS
467                 }
468 #endif
469         }
470         xseg_quit_local_signal(xseg, peer->portno_start);
471 #endif
472         return 0;
473 }
474
475 static struct xseg *join(char *spec)
476 {
477         struct xseg_config config;
478         struct xseg *xseg;
479
480         (void)xseg_parse_spec(spec, &config);
481         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
482         if (xseg)
483                 return xseg;
484
485         (void)xseg_create(&config);
486         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
487 }
488
489 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
490                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
491 {
492         int i;
493         struct peerd *peer;
494         struct xseg_port *port;
495
496 #ifdef ST_THREADS
497         st_init();
498 #endif
499         peer = malloc(sizeof(struct peerd));
500         if (!peer) {
501                 perror("malloc");
502                 return NULL;
503         }
504         peer->nr_ops = nr_ops;
505         peer->defer_portno = defer_portno;
506 #ifdef MT
507         peer->nr_threads = nr_threads;
508         peer->thread = calloc(nr_threads, sizeof(struct thread));
509         if (!peer->thread)
510                 goto malloc_fail;
511 #endif
512         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
513         if (!peer->peer_reqs){
514 malloc_fail:
515                 perror("malloc");
516                 return NULL;
517         }
518
519         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
520                 goto malloc_fail;
521 #ifdef MT
522         if (!xq_alloc_empty(&peer->threads, nr_threads))
523                 goto malloc_fail;
524 #endif
525         if (xseg_initialize()){
526                 printf("cannot initialize library\n");
527                 return NULL;
528         }
529         peer->xseg = join(spec);
530         if (!peer->xseg) 
531                 return NULL;
532
533         peer->portno_start = (xport) portno_start;
534         peer->portno_end= (xport) portno_end;
535         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
536         if (!port){
537                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
538                 return NULL;
539         }
540
541         xport p;
542         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
543                 struct xseg_port *tmp;
544                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
545                 if (!tmp){
546                         printf("cannot bind to port %u\n", (unsigned int) p);
547                         return NULL;
548                 }
549         }
550
551         printf("Peer on ports  %u-%u\n", peer->portno_start,
552                         peer->portno_end);
553
554         for (i = 0; i < nr_ops; i++) {
555                 peer->peer_reqs[i].peer = peer;
556                 peer->peer_reqs[i].req = NULL;
557                 peer->peer_reqs[i].retval = 0;
558                 peer->peer_reqs[i].priv = NULL;
559                 peer->peer_reqs[i].portno = NoPort;
560 #ifdef ST_THREADS
561                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
562 #endif
563         }
564 #ifdef MT
565         peer->interactive_func = NULL;
566 #endif
567         return peer;
568 }
569
570 int pidfile_remove(char *path, int fd)
571 {
572         close(fd);
573         return (unlink(path));
574 }
575
576 int pidfile_write(int pid_fd)
577 {
578         char buf[16];
579         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
580         buf[15] = 0;
581         
582         lseek(pid_fd, 0, SEEK_SET);
583         int ret = write(pid_fd, buf, strlen(buf));
584         return ret;
585 }
586
587 int pidfile_read(char *path, pid_t *pid)
588 {
589         char buf[16], *endptr;
590         *pid = 0;
591
592         int fd = open(path, O_RDONLY);
593         if (fd < 0)
594                 return -1;
595         int ret = read(fd, buf, 15);
596         buf[15]=0;
597         close(fd);
598         if (ret < 0)
599                 return -1;
600         else{
601                 *pid = strtol(buf, &endptr, 10);
602                 if (endptr != &buf[ret]){
603                         *pid = 0;
604                         return -1;
605                 }
606         }
607         return 0;
608 }
609
610 int pidfile_open(char *path, pid_t *old_pid)
611 {
612         //nfs version > 3
613         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
614         if (fd < 0){
615                 if (errno == -EEXIST)
616                         pidfile_read(path, old_pid);
617         }
618         return fd;
619 }
620
621 int main(int argc, char *argv[])
622 {
623         struct peerd *peer = NULL;
624         //parse args
625         char *spec = "";
626         int i, r, daemonize = 0;
627         long portno_start = -1, portno_end = -1, portno = -1;
628         //set defaults here
629         uint32_t nr_ops = 16;
630         uint32_t nr_threads = 16 ;
631         unsigned int debug_level = 0;
632         uint32_t defer_portno = NoPort;
633         char *logfile = NULL;
634         char *pidfile = NULL;
635         pid_t old_pid;
636         int pid_fd = -1;
637
638         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
639         // -dp xseg_portno to defer blocking requests
640         // -l log file ?
641         //TODO print messages on arg parsing error
642         
643         for (i = 1; i < argc; i++) {
644                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
645                         spec = argv[i+1];
646                         i += 1;
647                         continue;
648                 }
649
650                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
651                         portno_start = strtoul(argv[i+1], NULL, 10);
652                         i += 1;
653                         continue;
654                 }
655                 
656                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
657                         portno_end = strtoul(argv[i+1], NULL, 10);
658                         i += 1;
659                         continue;
660                 }
661
662                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
663                         portno = strtoul(argv[i+1], NULL, 10);
664                         i += 1;
665                         continue;
666                 }
667
668                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
669                         nr_ops = strtoul(argv[i+1], NULL, 10);
670                         i += 1;
671                         continue;
672                 }
673                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
674                         debug_level = atoi(argv[i+1]);
675                         i += 1;
676                         continue;
677                 }
678                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
679                         nr_threads = strtoul(argv[i+1], NULL, 10);
680                         i += 1;
681                         continue;
682                 }
683                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
684                         defer_portno = strtoul(argv[i+1], NULL, 10);
685                         i += 1;
686                         continue;
687                 }
688                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
689                         logfile = argv[i+1];
690                         i += 1;
691                         continue;
692                 }
693                 if (!strcmp(argv[i], "-d")) {
694                         daemonize = 1;
695                         continue;
696                 }
697                 if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
698                         pidfile = argv[i+1];
699                         i += 1;
700                         continue;
701                 }
702
703         }
704         init_logctx(&lc, argv[0], debug_level, logfile);
705         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
706         
707         if (pidfile){
708                 pid_fd = pidfile_open(pidfile, &old_pid);
709                 if (pid_fd < 0) {
710                         if (old_pid) {
711                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
712                         } else {
713                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
714                         }
715                         return -1;
716                 }
717         }
718         
719         if (daemonize){
720                 if (daemon(0, 1) < 0){
721                         XSEGLOG2(&lc, E, "Cannot daemonize");
722                         r = -1;
723                         goto out;
724                 }
725         }
726
727         pidfile_write(pid_fd);
728         
729         //TODO perform argument sanity checks
730         verbose = debug_level;
731         if (portno != -1) {
732                 portno_start = portno;
733                 portno_end = portno;
734         }
735
736         setup_signals();
737         //TODO err check
738         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
739         if (!peer){
740                 r = -1;
741                 goto out;
742         }
743         r = custom_peer_init(peer, argc, argv);
744         if (r < 0)
745                 goto out;
746 #ifdef MT
747         peerd_start_threads(peer);
748 #endif
749
750 #ifdef ST_THREADS
751         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
752         r = st_thread_join(st, NULL);
753 #else
754         r = peerd_loop(peer);
755 #endif
756 out:
757         if (pid_fd > 0)
758                 pidfile_remove(pidfile, pid_fd);
759         return r;
760 }