add improved argument parsing. also add helper messages
[archipelago] / xseg / peers / user / peer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <sys/syscall.h>
7 #include <sys/time.h>
8 #include <signal.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <errno.h>
12
13 #ifdef MT
14 #include <pthread.h>
15 #endif
16
17 #include <xseg/xseg.h>
18 #include <peer.h>
19
20 #ifdef MT
21 #define PEER_TYPE "pthread"
22 #else
23 #define PEER_TYPE "posix"
24 #endif
25
26 //FIXME this should not be defined here probably
27 #define MAX_SPEC_LEN 128
28 #define MAX_PIDFILE_LEN 512
29
30 volatile unsigned int terminated = 0;
31 unsigned int verbose = 0;
32 struct log_ctx lc;
33 #ifdef ST_THREADS
34 uint32_t ta = 0;
35 #endif
36
37 #ifdef MT
38 struct peerd *global_peer;
39
40 struct thread {
41         struct peerd *peer;
42         pthread_t tid;
43         pthread_cond_t cond;
44         pthread_mutex_t lock;
45         void (*func)(void *arg);
46         void *arg;
47 };
48
49
50 inline static struct thread* alloc_thread(struct peerd *peer)
51 {
52         xqindex idx = xq_pop_head(&peer->threads, 1);
53         if (idx == Noneidx)
54                 return NULL;
55         return peer->thread + idx;
56 }
57
58 inline static void free_thread(struct peerd *peer, struct thread *t)
59 {
60         xqindex idx = t - peer->thread;
61         xq_append_head(&peer->threads, idx, 1);
62 }
63
64
65 inline static void __wake_up_thread(struct thread *t)
66 {
67         pthread_mutex_lock(&t->lock);
68         pthread_cond_signal(&t->cond);
69         pthread_mutex_unlock(&t->lock);
70 }
71
72 inline static void wake_up_thread(struct thread* t)
73 {
74         if (t){
75                 __wake_up_thread(t);
76         }
77 }
78
79 inline static int wake_up_next_thread(struct peerd *peer)
80 {
81         return (xseg_signal(peer->xseg, peer->portno_start));
82 }
83 #endif
84
85
86 static inline int isTerminate()
87 {
88 /* ta doesn't need to be taken into account, because the main loops
89  * doesn't check the terminated flag if ta is not 0.
90  */
91         /*
92 #ifdef ST_THREADS
93         return (!ta & terminated);
94 #else
95         return terminated;
96 #endif
97         */
98         return terminated;
99 }
100
101 void signal_handler(int signal)
102 {
103         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
104         terminated = 1;
105 #ifdef MT
106         wake_up_next_thread(global_peer);
107 #endif
108 }
109
110 void renew_logfile(int signal)
111 {
112         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
113         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
114 }
115
116 static int setup_signals(struct peerd *peer)
117 {
118         int r;
119         struct sigaction sa;
120 #ifdef MT
121         global_peer = peer;
122 #endif
123         sigemptyset(&sa.sa_mask);
124         sa.sa_flags = 0;
125         sa.sa_handler = signal_handler;
126         r = sigaction(SIGTERM, &sa, NULL);
127         if (r < 0)
128                 return r;
129         r = sigaction(SIGINT, &sa, NULL);
130         if (r < 0)
131                 return r;
132         r = sigaction(SIGQUIT, &sa, NULL);
133         if (r < 0)
134                 return r;
135
136         sa.sa_handler = renew_logfile;
137         r = sigaction(SIGUSR1, &sa, NULL);
138         if (r < 0)
139                 return r;
140
141         return r;
142 }
143
144 inline int canDefer(struct peerd *peer)
145 {
146         return !(peer->defer_portno == NoPort);
147 }
148
149 void print_req(struct xseg *xseg, struct xseg_request *req)
150 {
151         char target[64], data[64];
152         char *req_target, *req_data;
153         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
154         req_target = xseg_get_target(xseg, req);
155         req_data = xseg_get_data(xseg, req);
156
157         if (1) {
158                 strncpy(target, req_target, end);
159                 target[end] = 0;
160                 strncpy(data, req_data, 63);
161                 data[63] = 0;
162                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
163                                 "src: %u, st: %u, dst: %u dt: %u\n"
164                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
165                                 (unsigned long)(req),
166                                 (unsigned int)req->op,
167                                 (unsigned long long)req->offset,
168                                 (unsigned long)req->size,
169                                 (unsigned long)req->serviced,
170                                 (unsigned int)req->state,
171                                 (unsigned int)req->src_portno,
172                                 (unsigned int)req->src_transit_portno,
173                                 (unsigned int)req->dst_portno,
174                                 (unsigned int)req->dst_transit_portno,
175                                 (unsigned int)req->targetlen, target,
176                                 (unsigned long long)req->datalen, data);
177         }
178 }
179 void log_pr(char *msg, struct peer_req *pr)
180 {
181         char target[64], data[64];
182         char *req_target, *req_data;
183         struct peerd *peer = pr->peer;
184         struct xseg *xseg = pr->peer->xseg;
185         req_target = xseg_get_target(xseg, pr->req);
186         req_data = xseg_get_data(xseg, pr->req);
187         /* null terminate name in case of req->target is less than 63 characters,
188          * and next character after name (aka first byte of next buffer) is not
189          * null
190          */
191         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
192         if (verbose) {
193                 strncpy(target, req_target, end);
194                 target[end] = 0;
195                 strncpy(data, req_data, 63);
196                 data[63] = 0;
197                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
198                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
199                                 msg,
200                                 (unsigned int)(pr - peer->peer_reqs),
201                                 (unsigned int)pr->req->op,
202                                 (unsigned long long)pr->req->offset,
203                                 (unsigned long)pr->req->size,
204                                 (unsigned long)pr->req->serviced,
205                                 (unsigned long)pr->retval,
206                                 (unsigned int)pr->req->state,
207                                 (unsigned int)pr->req->targetlen, target,
208                                 (unsigned long long)pr->req->datalen, data);
209         }
210 }
211
212 inline struct peer_req *alloc_peer_req(struct peerd *peer)
213 {
214         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
215         if (idx == Noneidx)
216                 return NULL;
217         return peer->peer_reqs + idx;
218 }
219
220 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
221 {
222         xqindex idx = pr - peer->peer_reqs;
223         pr->req = NULL;
224         xq_append_head(&peer->free_reqs, idx, 1);
225 }
226
227 struct timeval resp_start, resp_end, resp_accum = {0, 0};
228 uint64_t responds = 0;
229 void get_responds_stats(){
230                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
231                                 //(unsigned int)(t - peer->thread),
232                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
233 }
234
235 //FIXME error check
236 void fail(struct peerd *peer, struct peer_req *pr)
237 {
238         struct xseg_request *req = pr->req;
239         uint32_t p;
240         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
241         req->state |= XS_FAILED;
242         //xseg_set_req_data(peer->xseg, pr->req, NULL);
243         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
244         xseg_signal(peer->xseg, p);
245         free_peer_req(peer, pr);
246 #ifdef MT
247         wake_up_next_thread(peer);
248 #endif
249 }
250
251 //FIXME error check
252 void complete(struct peerd *peer, struct peer_req *pr)
253 {
254         struct xseg_request *req = pr->req;
255         uint32_t p;
256         req->state |= XS_SERVED;
257         //xseg_set_req_data(peer->xseg, pr->req, NULL);
258         //gettimeofday(&resp_start, NULL);
259         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
260         //gettimeofday(&resp_end, NULL);
261         //responds++;
262         //timersub(&resp_end, &resp_start, &resp_end);
263         //timeradd(&resp_end, &resp_accum, &resp_accum);
264         //printf("xseg_signal: %u\n", p);
265         xseg_signal(peer->xseg, p);
266         free_peer_req(peer, pr);
267 #ifdef MT
268         wake_up_next_thread(peer);
269 #endif
270 }
271
272 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
273                                 struct xseg_request *req)
274 {
275         struct xseg_request *xreq = pr->req;
276         //assert xreq == req;
277         XSEGLOG2(&lc, D, "Handle accepted");
278         xreq->serviced = 0;
279         //xreq->state = XS_ACCEPTED;
280         pr->retval = 0;
281         dispatch(peer, pr, req, dispatch_accept);
282 }
283
284 static void handle_received(struct peerd *peer, struct peer_req *pr,
285                                 struct xseg_request *req)
286 {
287         //struct xseg_request *req = pr->req;
288         //assert req->state != XS_ACCEPTED;
289         XSEGLOG2(&lc, D, "Handle received \n");
290         dispatch(peer, pr, req, dispatch_receive);
291
292 }
293 struct timeval sub_start, sub_end, sub_accum = {0, 0};
294 uint64_t submits = 0;
295 void get_submits_stats(){
296                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
297                                 //(unsigned int)(t - peer->thread),
298                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
299 }
300
301 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
302 {
303         uint32_t ret;
304         struct xseg_request *req = pr->req;
305         // assert req->portno == peer->portno ?
306         //TODO small function with error checking
307         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
308         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
309         if (ret < 0)
310                 return -1;
311         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
312         //gettimeofday(&sub_start, NULL);
313         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
314         //gettimeofday(&sub_end, NULL);
315         //submits++;
316         //timersub(&sub_end, &sub_start, &sub_end);
317         //timeradd(&sub_end, &sub_accum, &sub_accum);
318         if (ret == NoPort)
319                 return -1;
320         xseg_signal(peer->xseg, ret);
321         return 0;
322 }
323
324 static int check_ports(struct peerd *peer)
325 {
326         struct xseg *xseg = peer->xseg;
327         xport portno_start = peer->portno_start;
328         xport portno_end = peer->portno_end;
329         struct xseg_request *accepted, *received;
330         struct peer_req *pr;
331         xport i;
332         int  r, c = 0;
333
334         for (i = portno_start; i <= portno_end; i++) {
335                 accepted = NULL;
336                 received = NULL;
337                 if (!isTerminate()) {
338                         pr = alloc_peer_req(peer);
339                         if (pr) {
340                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
341                                 if (accepted) {
342                                         pr->req = accepted;
343                                         pr->portno = i;
344                                         xseg_cancel_wait(xseg, i);
345                                         handle_accepted(peer, pr, accepted);
346                                         c = 1;
347                                 }
348                                 else {
349                                         free_peer_req(peer, pr);
350                                 }
351                         }
352                 }
353                 received = xseg_receive(xseg, i, X_NONBLOCK);
354                 if (received) {
355                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
356                         if (r < 0 || !pr){
357                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
358                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
359                                 if (p == NoPort){
360                                         XSEGLOG2(&lc, W, "Could not respond stale request");
361                                         xseg_put_request(xseg, received, portno_start);
362                                         continue;
363                                 } else {
364                                         xseg_signal(xseg, p);
365                                 }
366                         } else {
367                                 //maybe perform sanity check for pr
368                                 xseg_cancel_wait(xseg, i);
369                                 handle_received(peer, pr, received);
370                                 c = 1;
371                         }
372                 }
373         }
374
375         return c;
376 }
377
378 #ifdef MT
379 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
380 {
381         struct thread *t = alloc_thread(peer);
382         if (t) {
383                 t->func = func;
384                 t->arg = arg;
385                 wake_up_thread(t);
386                 return 0;
387         } else
388                 // we could hijack a thread
389                 return -1;
390 }
391
392 static void* thread_loop(void *arg)
393 {
394         struct thread *t = (struct thread *) arg;
395         struct peerd *peer = t->peer;
396         struct xseg *xseg = peer->xseg;
397         xport portno_start = peer->portno_start;
398         xport portno_end = peer->portno_end;
399         pid_t pid =syscall(SYS_gettid);
400         uint64_t loops;
401         uint64_t threshold=1000/(1 + portno_end - portno_start);
402
403         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
404
405         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
406         xseg_init_local_signal(xseg, peer->portno_start);
407         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
408                 if (t->func) {
409                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
410                         xseg_cancel_wait(xseg, peer->portno_start);
411                         t->func(t->arg);
412                         t->func = NULL;
413                         t->arg = NULL;
414                         continue;
415                 }
416
417                 for(loops= threshold; loops > 0; loops--) {
418                         if (loops == 1)
419                                 xseg_prepare_wait(xseg, peer->portno_start);
420                         if (check_ports(peer))
421                                 loops = threshold;
422                 }
423                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
424                 xseg_wait_signal(xseg, 10000000UL);
425                 xseg_cancel_wait(xseg, peer->portno_start);
426                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
427         }
428         wake_up_next_thread(peer);
429         return NULL;
430 }
431
432 int peerd_start_threads(struct peerd *peer)
433 {
434         int i;
435         uint32_t nr_threads = peer->nr_threads;
436         //TODO err check
437         for (i = 0; i < nr_threads; i++) {
438                 peer->thread[i].peer = peer;
439                 pthread_cond_init(&peer->thread[i].cond,NULL);
440                 pthread_mutex_init(&peer->thread[i].lock, NULL);
441                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
442                 peer->thread[i].func = NULL;
443                 peer->thread[i].arg = NULL;
444
445         }
446         return 0;
447 }
448 #endif
449
450 void defer_request(struct peerd *peer, struct peer_req *pr)
451 {
452         // assert canDefer(peer);
453 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
454 //      xseg_signal(peer->xseg, peer->defer_portno);
455 //      free_peer_req(peer, pr);
456 }
457
458 static int peerd_loop(struct peerd *peer) 
459 {
460 #ifdef MT
461         int i;
462         if (peer->interactive_func)
463                 peer->interactive_func();
464         for (i = 0; i < peer->nr_threads; i++) {
465                 pthread_join(peer->thread[i].tid, NULL);
466         }
467 #else
468         struct xseg *xseg = peer->xseg;
469         xport portno_start = peer->portno_start;
470         xport portno_end = peer->portno_end;
471         uint64_t threshold=1000/(1 + portno_end - portno_start);
472         pid_t pid =syscall(SYS_gettid);
473         uint64_t loops;
474
475         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
476         xseg_init_local_signal(xseg, peer->portno_start);
477         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
478                 for(loops= threshold; loops > 0; loops--) {
479                         if (loops == 1)
480                                 xseg_prepare_wait(xseg, peer->portno_start);
481                         if (check_ports(peer))
482                                 loops = threshold;
483                 }
484 #ifdef ST_THREADS
485                 if (ta){
486                         st_sleep(0);
487                 } else {
488 #endif
489                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
490                         xseg_wait_signal(xseg, 10000000UL);
491                         xseg_cancel_wait(xseg, peer->portno_start);
492                         XSEGLOG2(&lc, I, "Peer woke up\n");
493 #ifdef ST_THREADS
494                 }
495 #endif
496         }
497         xseg_quit_local_signal(xseg, peer->portno_start);
498 #endif
499         return 0;
500 }
501
502 static struct xseg *join(char *spec)
503 {
504         struct xseg_config config;
505         struct xseg *xseg;
506
507         (void)xseg_parse_spec(spec, &config);
508         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
509         if (xseg)
510                 return xseg;
511
512         (void)xseg_create(&config);
513         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
514 }
515
516 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
517                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
518 {
519         int i;
520         struct peerd *peer;
521         struct xseg_port *port;
522
523 #ifdef ST_THREADS
524         st_init();
525 #endif
526         peer = malloc(sizeof(struct peerd));
527         if (!peer) {
528                 perror("malloc");
529                 return NULL;
530         }
531         peer->nr_ops = nr_ops;
532         peer->defer_portno = defer_portno;
533 #ifdef MT
534         peer->nr_threads = nr_threads;
535         peer->thread = calloc(nr_threads, sizeof(struct thread));
536         if (!peer->thread)
537                 goto malloc_fail;
538 #endif
539         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
540         if (!peer->peer_reqs){
541 malloc_fail:
542                 perror("malloc");
543                 return NULL;
544         }
545
546         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
547                 goto malloc_fail;
548 #ifdef MT
549         if (!xq_alloc_empty(&peer->threads, nr_threads))
550                 goto malloc_fail;
551 #endif
552         if (xseg_initialize()){
553                 printf("cannot initialize library\n");
554                 return NULL;
555         }
556         peer->xseg = join(spec);
557         if (!peer->xseg) 
558                 return NULL;
559
560         peer->portno_start = (xport) portno_start;
561         peer->portno_end= (xport) portno_end;
562         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
563         if (!port){
564                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
565                 return NULL;
566         }
567
568         xport p;
569         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
570                 struct xseg_port *tmp;
571                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
572                 if (!tmp){
573                         printf("cannot bind to port %u\n", (unsigned int) p);
574                         return NULL;
575                 }
576         }
577
578         printf("Peer on ports  %u-%u\n", peer->portno_start,
579                         peer->portno_end);
580
581         for (i = 0; i < nr_ops; i++) {
582                 peer->peer_reqs[i].peer = peer;
583                 peer->peer_reqs[i].req = NULL;
584                 peer->peer_reqs[i].retval = 0;
585                 peer->peer_reqs[i].priv = NULL;
586                 peer->peer_reqs[i].portno = NoPort;
587 #ifdef ST_THREADS
588                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
589 #endif
590         }
591 #ifdef MT
592         peer->interactive_func = NULL;
593 #endif
594         return peer;
595 }
596
597 int pidfile_remove(char *path, int fd)
598 {
599         close(fd);
600         return (unlink(path));
601 }
602
603 int pidfile_write(int pid_fd)
604 {
605         char buf[16];
606         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
607         buf[15] = 0;
608         
609         lseek(pid_fd, 0, SEEK_SET);
610         int ret = write(pid_fd, buf, strlen(buf));
611         return ret;
612 }
613
614 int pidfile_read(char *path, pid_t *pid)
615 {
616         char buf[16], *endptr;
617         *pid = 0;
618
619         int fd = open(path, O_RDONLY);
620         if (fd < 0)
621                 return -1;
622         int ret = read(fd, buf, 15);
623         buf[15]=0;
624         close(fd);
625         if (ret < 0)
626                 return -1;
627         else{
628                 *pid = strtol(buf, &endptr, 10);
629                 if (endptr != &buf[ret]){
630                         *pid = 0;
631                         return -1;
632                 }
633         }
634         return 0;
635 }
636
637 int pidfile_open(char *path, pid_t *old_pid)
638 {
639         //nfs version > 3
640         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
641         if (fd < 0){
642                 if (errno == -EEXIST)
643                         pidfile_read(path, old_pid);
644         }
645         return fd;
646 }
647
648 void usage(char *argv0)
649 {
650         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
651         fprintf(stderr, "General peer options:\n"
652                 "  Option      | Default | \n"
653                 "  --------------------------------------------\n"
654                 "    -g        | None    | Segment spec to join\n"
655                 "    -sp       | NoPort  | Start portno to bind\n"
656                 "    -ep       | NoPort  | End portno to bind\n"
657                 "    -p        | NoPort  | Portno to bind\n"
658                 "    -n        | 16      | Number of ops\n"
659                 "    -v        | 0       | Verbosity level\n"
660                 "    -l        | None    | Logfile \n"
661                 "    -d        | No      | Daemonize \n"
662                 "    --pidfile | None    | Pidfile \n"
663 #ifdef MT
664                 "    -t        | No      | Number of threads \n"
665 #endif
666                 "\n"
667                );
668         custom_peer_usage();
669 }
670
671 int main(int argc, char *argv[])
672 {
673         struct peerd *peer = NULL;
674         //parse args
675         int r;
676         long portno_start = -1, portno_end = -1, portno = -1;
677
678         //set defaults here
679         int daemonize = 0, help = 0;
680         uint32_t nr_ops = 16;
681         uint32_t nr_threads = 1;
682         unsigned int debug_level = 0;
683         uint32_t defer_portno = NoPort;
684         pid_t old_pid;
685         int pid_fd = -1;
686
687         char spec[MAX_SPEC_LEN + 1];
688         char logfile[MAX_LOGFILE_LEN + 1];
689         char pidfile[MAX_PIDFILE_LEN + 1];
690
691         logfile[0] = 0;
692         pidfile[0] = 0;
693         spec[0] = 0;
694
695         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
696         // -dp xseg_portno to defer blocking requests
697         // -l log file ?
698         //TODO print messages on arg parsing error
699         //TODO string checking
700
701         /*
702         for (i = 1; i < argc; i++) {
703                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
704                         spec = argv[i+1];
705                         i += 1;
706                         continue;
707                 }
708
709                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
710                         portno_start = strtoul(argv[i+1], NULL, 10);
711                         i += 1;
712                         continue;
713                 }
714
715                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
716                         portno_end = strtoul(argv[i+1], NULL, 10);
717                         i += 1;
718                         continue;
719                 }
720
721                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
722                         portno = strtoul(argv[i+1], NULL, 10);
723                         i += 1;
724                         continue;
725                 }
726
727                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
728                         nr_ops = strtoul(argv[i+1], NULL, 10);
729                         i += 1;
730                         continue;
731                 }
732                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
733                         debug_level = atoi(argv[i+1]);
734                         i += 1;
735                         continue;
736                 }
737                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
738                         nr_threads = strtoul(argv[i+1], NULL, 10);
739                         i += 1;
740                         continue;
741                 }
742                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
743                         defer_portno = strtoul(argv[i+1], NULL, 10);
744                         i += 1;
745                         continue;
746                 }
747                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
748                         logfile = argv[i+1];
749                         i += 1;
750                         continue;
751                 }
752                 if (!strcmp(argv[i], "-d")) {
753                         daemonize = 1;
754                         continue;
755                 }
756                 if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
757                         pidfile = argv[i+1];
758                         i += 1;
759                         continue;
760                 }
761
762         }
763         */
764         BEGIN_READ_ARGS(argc, argv);
765         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
766         READ_ARG_ULONG("-sp", portno_start);
767         READ_ARG_ULONG("-ep", portno_end);
768         READ_ARG_ULONG("-p", portno);
769         READ_ARG_ULONG("-n", nr_ops);
770         READ_ARG_ULONG("-v", debug_level);
771 #ifdef MT
772         READ_ARG_ULONG("-t", nr_threads);
773 #endif
774 //      READ_ARG_ULONG("-dp", defer_portno);
775         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
776         READ_ARG_BOOL("-d", daemonize);
777         READ_ARG_BOOL("-h", help);
778         READ_ARG_BOOL("--help", help);
779         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
780         END_READ_ARGS();
781
782         if (help){
783                 usage(argv[0]);
784                 return 0;
785         }
786
787         r = init_logctx(&lc, argv[0], debug_level, logfile,
788                         REDIRECT_STDOUT|REDIRECT_STDERR);
789         if (r < 0){
790                 XSEGLOG("Cannot initialize logging to logfile");
791                 return -1;
792         }
793         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
794
795         if (pidfile[0]){
796                 pid_fd = pidfile_open(pidfile, &old_pid);
797                 if (pid_fd < 0) {
798                         if (old_pid) {
799                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
800                         } else {
801                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
802                         }
803                         return -1;
804                 }
805         }
806
807         if (daemonize){
808                 if (daemon(0, 1) < 0){
809                         XSEGLOG2(&lc, E, "Cannot daemonize");
810                         r = -1;
811                         goto out;
812                 }
813         }
814
815         pidfile_write(pid_fd);
816
817         //TODO perform argument sanity checks
818         verbose = debug_level;
819         if (portno != -1) {
820                 portno_start = portno;
821                 portno_end = portno;
822         }
823
824         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
825         if (!peer){
826                 r = -1;
827                 goto out;
828         }
829         setup_signals(peer);
830         r = custom_peer_init(peer, argc, argv);
831         if (r < 0)
832                 goto out;
833 #ifdef MT
834         //TODO err check
835         peerd_start_threads(peer);
836 #endif
837
838 #ifdef ST_THREADS
839         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
840         r = st_thread_join(st, NULL);
841 #else
842         r = peerd_loop(peer);
843 #endif
844 out:
845         if (pid_fd > 0)
846                 pidfile_remove(pidfile, pid_fd);
847         return r;
848 }