Merge branch 'xseg-refactor' into debian
[archipelago] / xseg / peers / user / peer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <sys/syscall.h>
7 #include <sys/time.h>
8 #include <signal.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <errno.h>
12
13 #ifdef MT
14 #include <pthread.h>
15 #endif
16
17 #include <xseg/xseg.h>
18 #include <peer.h>
19
20 #ifdef MT
21 #define PEER_TYPE "pthread"
22 #else
23 #define PEER_TYPE "posix"
24 #endif
25
26 //FIXME this should not be defined here probably
27 #define MAX_SPEC_LEN 128
28 #define MAX_PIDFILE_LEN 512
29
30 volatile unsigned int terminated = 0;
31 unsigned int verbose = 0;
32 struct log_ctx lc;
33 #ifdef ST_THREADS
34 uint32_t ta = 0;
35 #endif
36
37 #ifdef MT
38 struct peerd *global_peer;
39
40 struct thread {
41         struct peerd *peer;
42         pthread_t tid;
43         pthread_cond_t cond;
44         pthread_mutex_t lock;
45         void (*func)(void *arg);
46         void *arg;
47 };
48
49
50 inline static struct thread* alloc_thread(struct peerd *peer)
51 {
52         xqindex idx = xq_pop_head(&peer->threads, 1);
53         if (idx == Noneidx)
54                 return NULL;
55         return peer->thread + idx;
56 }
57
58 inline static void free_thread(struct peerd *peer, struct thread *t)
59 {
60         xqindex idx = t - peer->thread;
61         xq_append_head(&peer->threads, idx, 1);
62 }
63
64
65 inline static void __wake_up_thread(struct thread *t)
66 {
67         pthread_mutex_lock(&t->lock);
68         pthread_cond_signal(&t->cond);
69         pthread_mutex_unlock(&t->lock);
70 }
71
72 inline static void wake_up_thread(struct thread* t)
73 {
74         if (t){
75                 __wake_up_thread(t);
76         }
77 }
78
79 inline static int wake_up_next_thread(struct peerd *peer)
80 {
81         return (xseg_signal(peer->xseg, peer->portno_start));
82 }
83 #endif
84
85
86 static inline int isTerminate()
87 {
88 /* ta doesn't need to be taken into account, because the main loops
89  * doesn't check the terminated flag if ta is not 0.
90  */
91         /*
92 #ifdef ST_THREADS
93         return (!ta & terminated);
94 #else
95         return terminated;
96 #endif
97         */
98         return terminated;
99 }
100
101 void signal_handler(int signal)
102 {
103         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
104         terminated = 1;
105 #ifdef MT
106         wake_up_next_thread(global_peer);
107 #endif
108 }
109
110 void renew_logfile(int signal)
111 {
112         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
113         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
114 }
115
116 static int setup_signals(struct peerd *peer)
117 {
118         int r;
119         struct sigaction sa;
120 #ifdef MT
121         global_peer = peer;
122 #endif
123         sigemptyset(&sa.sa_mask);
124         sa.sa_flags = 0;
125         sa.sa_handler = signal_handler;
126         r = sigaction(SIGTERM, &sa, NULL);
127         if (r < 0)
128                 return r;
129         r = sigaction(SIGINT, &sa, NULL);
130         if (r < 0)
131                 return r;
132         r = sigaction(SIGQUIT, &sa, NULL);
133         if (r < 0)
134                 return r;
135
136         sa.sa_handler = renew_logfile;
137         r = sigaction(SIGUSR1, &sa, NULL);
138         if (r < 0)
139                 return r;
140
141         return r;
142 }
143
144 inline int canDefer(struct peerd *peer)
145 {
146         return !(peer->defer_portno == NoPort);
147 }
148
149 void print_req(struct xseg *xseg, struct xseg_request *req)
150 {
151         char target[64], data[64];
152         char *req_target, *req_data;
153         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
154         req_target = xseg_get_target(xseg, req);
155         req_data = xseg_get_data(xseg, req);
156
157         if (1) {
158                 strncpy(target, req_target, end);
159                 target[end] = 0;
160                 strncpy(data, req_data, 63);
161                 data[63] = 0;
162                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
163                                 "src: %u, st: %u, dst: %u dt: %u\n"
164                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
165                                 (unsigned long)(req),
166                                 (unsigned int)req->op,
167                                 (unsigned long long)req->offset,
168                                 (unsigned long)req->size,
169                                 (unsigned long)req->serviced,
170                                 (unsigned int)req->state,
171                                 (unsigned int)req->src_portno,
172                                 (unsigned int)req->src_transit_portno,
173                                 (unsigned int)req->dst_portno,
174                                 (unsigned int)req->dst_transit_portno,
175                                 (unsigned int)req->targetlen, target,
176                                 (unsigned long long)req->datalen, data);
177         }
178 }
179 void log_pr(char *msg, struct peer_req *pr)
180 {
181         char target[64], data[64];
182         char *req_target, *req_data;
183         struct peerd *peer = pr->peer;
184         struct xseg *xseg = pr->peer->xseg;
185         req_target = xseg_get_target(xseg, pr->req);
186         req_data = xseg_get_data(xseg, pr->req);
187         /* null terminate name in case of req->target is less than 63 characters,
188          * and next character after name (aka first byte of next buffer) is not
189          * null
190          */
191         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
192         if (verbose) {
193                 strncpy(target, req_target, end);
194                 target[end] = 0;
195                 strncpy(data, req_data, 63);
196                 data[63] = 0;
197                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
198                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
199                                 msg,
200                                 (unsigned int)(pr - peer->peer_reqs),
201                                 (unsigned int)pr->req->op,
202                                 (unsigned long long)pr->req->offset,
203                                 (unsigned long)pr->req->size,
204                                 (unsigned long)pr->req->serviced,
205                                 (unsigned long)pr->retval,
206                                 (unsigned int)pr->req->state,
207                                 (unsigned int)pr->req->targetlen, target,
208                                 (unsigned long long)pr->req->datalen, data);
209         }
210 }
211
212 inline struct peer_req *alloc_peer_req(struct peerd *peer)
213 {
214         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
215         if (idx == Noneidx)
216                 return NULL;
217         return peer->peer_reqs + idx;
218 }
219
220 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
221 {
222         xqindex idx = pr - peer->peer_reqs;
223         pr->req = NULL;
224         xq_append_head(&peer->free_reqs, idx, 1);
225 }
226
227 struct timeval resp_start, resp_end, resp_accum = {0, 0};
228 uint64_t responds = 0;
229 void get_responds_stats(){
230                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
231                                 //(unsigned int)(t - peer->thread),
232                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
233 }
234
235 //FIXME error check
236 void fail(struct peerd *peer, struct peer_req *pr)
237 {
238         struct xseg_request *req = pr->req;
239         uint32_t p;
240         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
241         req->state |= XS_FAILED;
242         //xseg_set_req_data(peer->xseg, pr->req, NULL);
243         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
244         xseg_signal(peer->xseg, p);
245         free_peer_req(peer, pr);
246 #ifdef MT
247         wake_up_next_thread(peer);
248 #endif
249 }
250
251 //FIXME error check
252 void complete(struct peerd *peer, struct peer_req *pr)
253 {
254         struct xseg_request *req = pr->req;
255         uint32_t p;
256         req->state |= XS_SERVED;
257         //xseg_set_req_data(peer->xseg, pr->req, NULL);
258         //gettimeofday(&resp_start, NULL);
259         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
260         //gettimeofday(&resp_end, NULL);
261         //responds++;
262         //timersub(&resp_end, &resp_start, &resp_end);
263         //timeradd(&resp_end, &resp_accum, &resp_accum);
264         //printf("xseg_signal: %u\n", p);
265         xseg_signal(peer->xseg, p);
266         free_peer_req(peer, pr);
267 #ifdef MT
268         wake_up_next_thread(peer);
269 #endif
270 }
271
272 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
273                                 struct xseg_request *req)
274 {
275         struct xseg_request *xreq = pr->req;
276         //assert xreq == req;
277         XSEGLOG2(&lc, D, "Handle accepted");
278         xreq->serviced = 0;
279         //xreq->state = XS_ACCEPTED;
280         pr->retval = 0;
281         dispatch(peer, pr, req, dispatch_accept);
282 }
283
284 static void handle_received(struct peerd *peer, struct peer_req *pr,
285                                 struct xseg_request *req)
286 {
287         //struct xseg_request *req = pr->req;
288         //assert req->state != XS_ACCEPTED;
289         XSEGLOG2(&lc, D, "Handle received \n");
290         dispatch(peer, pr, req, dispatch_receive);
291
292 }
293 struct timeval sub_start, sub_end, sub_accum = {0, 0};
294 uint64_t submits = 0;
295 void get_submits_stats(){
296                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
297                                 //(unsigned int)(t - peer->thread),
298                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
299 }
300
301 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
302 {
303         uint32_t ret;
304         struct xseg_request *req = pr->req;
305         // assert req->portno == peer->portno ?
306         //TODO small function with error checking
307         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
308         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
309         if (ret < 0)
310                 return -1;
311         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
312         //gettimeofday(&sub_start, NULL);
313         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
314         //gettimeofday(&sub_end, NULL);
315         //submits++;
316         //timersub(&sub_end, &sub_start, &sub_end);
317         //timeradd(&sub_end, &sub_accum, &sub_accum);
318         if (ret == NoPort)
319                 return -1;
320         xseg_signal(peer->xseg, ret);
321         return 0;
322 }
323
324 static int check_ports(struct peerd *peer)
325 {
326         struct xseg *xseg = peer->xseg;
327         xport portno_start = peer->portno_start;
328         xport portno_end = peer->portno_end;
329         struct xseg_request *accepted, *received;
330         struct peer_req *pr;
331         xport i;
332         int  r, c = 0;
333
334         for (i = portno_start; i <= portno_end; i++) {
335                 accepted = NULL;
336                 received = NULL;
337                 if (!isTerminate()) {
338                         pr = alloc_peer_req(peer);
339                         if (pr) {
340                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
341                                 if (accepted) {
342                                         pr->req = accepted;
343                                         pr->portno = i;
344                                         xseg_cancel_wait(xseg, i);
345                                         handle_accepted(peer, pr, accepted);
346                                         c = 1;
347                                 }
348                                 else {
349                                         free_peer_req(peer, pr);
350                                 }
351                         }
352                 }
353                 received = xseg_receive(xseg, i, X_NONBLOCK);
354                 if (received) {
355                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
356                         if (r < 0 || !pr){
357                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
358                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
359                                 if (p == NoPort){
360                                         XSEGLOG2(&lc, W, "Could not respond stale request");
361                                         xseg_put_request(xseg, received, portno_start);
362                                         continue;
363                                 } else {
364                                         xseg_signal(xseg, p);
365                                 }
366                         } else {
367                                 //maybe perform sanity check for pr
368                                 xseg_cancel_wait(xseg, i);
369                                 handle_received(peer, pr, received);
370                                 c = 1;
371                         }
372                 }
373         }
374
375         return c;
376 }
377
378 #ifdef MT
379 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
380 {
381         struct thread *t = alloc_thread(peer);
382         if (t) {
383                 t->func = func;
384                 t->arg = arg;
385                 wake_up_thread(t);
386                 return 0;
387         } else
388                 // we could hijack a thread
389                 return -1;
390 }
391
392 static void* thread_loop(void *arg)
393 {
394         struct thread *t = (struct thread *) arg;
395         struct peerd *peer = t->peer;
396         struct xseg *xseg = peer->xseg;
397         xport portno_start = peer->portno_start;
398         xport portno_end = peer->portno_end;
399         pid_t pid =syscall(SYS_gettid);
400         uint64_t loops;
401         uint64_t threshold=1000/(1 + portno_end - portno_start);
402
403         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
404
405         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
406         xseg_init_local_signal(xseg, peer->portno_start);
407         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
408                 if (t->func) {
409                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
410                         xseg_cancel_wait(xseg, peer->portno_start);
411                         t->func(t->arg);
412                         t->func = NULL;
413                         t->arg = NULL;
414                         continue;
415                 }
416
417                 for(loops= threshold; loops > 0; loops--) {
418                         if (loops == 1)
419                                 xseg_prepare_wait(xseg, peer->portno_start);
420                         if (check_ports(peer))
421                                 loops = threshold;
422                 }
423                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
424                 xseg_wait_signal(xseg, 10000000UL);
425                 xseg_cancel_wait(xseg, peer->portno_start);
426                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
427         }
428         wake_up_next_thread(peer);
429         custom_peer_finalize(peer);
430         return NULL;
431 }
432
433 int peerd_start_threads(struct peerd *peer)
434 {
435         int i;
436         uint32_t nr_threads = peer->nr_threads;
437         //TODO err check
438         for (i = 0; i < nr_threads; i++) {
439                 peer->thread[i].peer = peer;
440                 pthread_cond_init(&peer->thread[i].cond,NULL);
441                 pthread_mutex_init(&peer->thread[i].lock, NULL);
442                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
443                 peer->thread[i].func = NULL;
444                 peer->thread[i].arg = NULL;
445
446         }
447         return 0;
448 }
449 #endif
450
451
452 void defer_request(struct peerd *peer, struct peer_req *pr)
453 {
454         // assert canDefer(peer);
455 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
456 //      xseg_signal(peer->xseg, peer->defer_portno);
457 //      free_peer_req(peer, pr);
458 }
459
460 static int peerd_loop(struct peerd *peer) 
461 {
462 #ifdef MT
463         int i;
464         if (peer->interactive_func)
465                 peer->interactive_func();
466         for (i = 0; i < peer->nr_threads; i++) {
467                 pthread_join(peer->thread[i].tid, NULL);
468         }
469 #else
470         struct xseg *xseg = peer->xseg;
471         xport portno_start = peer->portno_start;
472         xport portno_end = peer->portno_end;
473         uint64_t threshold=1000/(1 + portno_end - portno_start);
474         pid_t pid =syscall(SYS_gettid);
475         uint64_t loops;
476
477         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
478         xseg_init_local_signal(xseg, peer->portno_start);
479         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
480                 for(loops= threshold; loops > 0; loops--) {
481                         if (loops == 1)
482                                 xseg_prepare_wait(xseg, peer->portno_start);
483                         if (check_ports(peer))
484                                 loops = threshold;
485                 }
486 #ifdef ST_THREADS
487                 if (ta){
488                         st_sleep(0);
489                 } else {
490 #endif
491                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
492                         xseg_wait_signal(xseg, 10000000UL);
493                         xseg_cancel_wait(xseg, peer->portno_start);
494                         XSEGLOG2(&lc, I, "Peer woke up\n");
495 #ifdef ST_THREADS
496                 }
497 #endif
498         }
499         custom_peer_finalize(peer);
500         xseg_quit_local_signal(xseg, peer->portno_start);
501 #endif
502         return 0;
503 }
504
505 static struct xseg *join(char *spec)
506 {
507         struct xseg_config config;
508         struct xseg *xseg;
509
510         (void)xseg_parse_spec(spec, &config);
511         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
512         if (xseg)
513                 return xseg;
514
515         (void)xseg_create(&config);
516         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
517 }
518
519 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
520                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
521 {
522         int i;
523         struct peerd *peer;
524         struct xseg_port *port;
525
526 #ifdef ST_THREADS
527         st_init();
528 #endif
529         peer = malloc(sizeof(struct peerd));
530         if (!peer) {
531                 perror("malloc");
532                 return NULL;
533         }
534         peer->nr_ops = nr_ops;
535         peer->defer_portno = defer_portno;
536 #ifdef MT
537         peer->nr_threads = nr_threads;
538         peer->thread = calloc(nr_threads, sizeof(struct thread));
539         if (!peer->thread)
540                 goto malloc_fail;
541 #endif
542         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
543         if (!peer->peer_reqs){
544 malloc_fail:
545                 perror("malloc");
546                 return NULL;
547         }
548
549         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
550                 goto malloc_fail;
551 #ifdef MT
552         if (!xq_alloc_empty(&peer->threads, nr_threads))
553                 goto malloc_fail;
554 #endif
555         if (xseg_initialize()){
556                 printf("cannot initialize library\n");
557                 return NULL;
558         }
559         peer->xseg = join(spec);
560         if (!peer->xseg) 
561                 return NULL;
562
563         peer->portno_start = (xport) portno_start;
564         peer->portno_end= (xport) portno_end;
565         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
566         if (!port){
567                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
568                 return NULL;
569         }
570
571         xport p;
572         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
573                 struct xseg_port *tmp;
574                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
575                 if (!tmp){
576                         printf("cannot bind to port %u\n", (unsigned int) p);
577                         return NULL;
578                 }
579         }
580
581         printf("Peer on ports  %u-%u\n", peer->portno_start,
582                         peer->portno_end);
583
584         for (i = 0; i < nr_ops; i++) {
585                 peer->peer_reqs[i].peer = peer;
586                 peer->peer_reqs[i].req = NULL;
587                 peer->peer_reqs[i].retval = 0;
588                 peer->peer_reqs[i].priv = NULL;
589                 peer->peer_reqs[i].portno = NoPort;
590 #ifdef ST_THREADS
591                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
592 #endif
593         }
594 #ifdef MT
595         peer->interactive_func = NULL;
596 #endif
597         return peer;
598 }
599
600 int pidfile_remove(char *path, int fd)
601 {
602         close(fd);
603         return (unlink(path));
604 }
605
606 int pidfile_write(int pid_fd)
607 {
608         char buf[16];
609         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
610         buf[15] = 0;
611
612         lseek(pid_fd, 0, SEEK_SET);
613         int ret = write(pid_fd, buf, strlen(buf));
614         return ret;
615 }
616
617 int pidfile_read(char *path, pid_t *pid)
618 {
619         char buf[16], *endptr;
620         *pid = 0;
621
622         int fd = open(path, O_RDONLY);
623         if (fd < 0)
624                 return -1;
625         int ret = read(fd, buf, 15);
626         buf[15]=0;
627         close(fd);
628         if (ret < 0)
629                 return -1;
630         else{
631                 *pid = strtol(buf, &endptr, 10);
632                 if (endptr != &buf[ret]){
633                         *pid = 0;
634                         return -1;
635                 }
636         }
637         return 0;
638 }
639
640 int pidfile_open(char *path, pid_t *old_pid)
641 {
642         //nfs version > 3
643         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
644         if (fd < 0){
645                 if (errno == EEXIST)
646                         pidfile_read(path, old_pid);
647         }
648         return fd;
649 }
650
651 void usage(char *argv0)
652 {
653         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
654         fprintf(stderr, "General peer options:\n"
655                 "  Option      | Default | \n"
656                 "  --------------------------------------------\n"
657                 "    -g        | None    | Segment spec to join\n"
658                 "    -sp       | NoPort  | Start portno to bind\n"
659                 "    -ep       | NoPort  | End portno to bind\n"
660                 "    -p        | NoPort  | Portno to bind\n"
661                 "    -n        | 16      | Number of ops\n"
662                 "    -v        | 0       | Verbosity level\n"
663                 "    -l        | None    | Logfile \n"
664                 "    -d        | No      | Daemonize \n"
665                 "    --pidfile | None    | Pidfile \n"
666 #ifdef MT
667                 "    -t        | No      | Number of threads \n"
668 #endif
669                 "\n"
670                );
671         custom_peer_usage();
672 }
673
674 int main(int argc, char *argv[])
675 {
676         struct peerd *peer = NULL;
677         //parse args
678         int r;
679         long portno_start = -1, portno_end = -1, portno = -1;
680
681         //set defaults here
682         int daemonize = 0, help = 0;
683         uint32_t nr_ops = 16;
684         uint32_t nr_threads = 1;
685         unsigned int debug_level = 0;
686         uint32_t defer_portno = NoPort;
687         pid_t old_pid;
688         int pid_fd = -1;
689
690         char spec[MAX_SPEC_LEN + 1];
691         char logfile[MAX_LOGFILE_LEN + 1];
692         char pidfile[MAX_PIDFILE_LEN + 1];
693
694         logfile[0] = 0;
695         pidfile[0] = 0;
696         spec[0] = 0;
697
698         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
699         // -dp xseg_portno to defer blocking requests
700         // -l log file ?
701         //TODO print messages on arg parsing error
702         //TODO string checking
703
704         /*
705         for (i = 1; i < argc; i++) {
706                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
707                         spec = argv[i+1];
708                         i += 1;
709                         continue;
710                 }
711
712                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
713                         portno_start = strtoul(argv[i+1], NULL, 10);
714                         i += 1;
715                         continue;
716                 }
717
718                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
719                         portno_end = strtoul(argv[i+1], NULL, 10);
720                         i += 1;
721                         continue;
722                 }
723
724                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
725                         portno = strtoul(argv[i+1], NULL, 10);
726                         i += 1;
727                         continue;
728                 }
729
730                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
731                         nr_ops = strtoul(argv[i+1], NULL, 10);
732                         i += 1;
733                         continue;
734                 }
735                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
736                         debug_level = atoi(argv[i+1]);
737                         i += 1;
738                         continue;
739                 }
740                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
741                         nr_threads = strtoul(argv[i+1], NULL, 10);
742                         i += 1;
743                         continue;
744                 }
745                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
746                         defer_portno = strtoul(argv[i+1], NULL, 10);
747                         i += 1;
748                         continue;
749                 }
750                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
751                         logfile = argv[i+1];
752                         i += 1;
753                         continue;
754                 }
755                 if (!strcmp(argv[i], "-d")) {
756                         daemonize = 1;
757                         continue;
758                 }
759                 if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
760                         pidfile = argv[i+1];
761                         i += 1;
762                         continue;
763                 }
764
765         }
766         */
767         BEGIN_READ_ARGS(argc, argv);
768         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
769         READ_ARG_ULONG("-sp", portno_start);
770         READ_ARG_ULONG("-ep", portno_end);
771         READ_ARG_ULONG("-p", portno);
772         READ_ARG_ULONG("-n", nr_ops);
773         READ_ARG_ULONG("-v", debug_level);
774 #ifdef MT
775         READ_ARG_ULONG("-t", nr_threads);
776 #endif
777 //      READ_ARG_ULONG("-dp", defer_portno);
778         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
779         READ_ARG_BOOL("-d", daemonize);
780         READ_ARG_BOOL("-h", help);
781         READ_ARG_BOOL("--help", help);
782         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
783         END_READ_ARGS();
784
785         if (help){
786                 usage(argv[0]);
787                 return 0;
788         }
789
790         r = init_logctx(&lc, argv[0], debug_level, logfile,
791                         REDIRECT_STDOUT|REDIRECT_STDERR);
792         if (r < 0){
793                 XSEGLOG("Cannot initialize logging to logfile");
794                 return -1;
795         }
796         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
797
798         if (pidfile[0]){
799                 pid_fd = pidfile_open(pidfile, &old_pid);
800                 if (pid_fd < 0) {
801                         if (old_pid) {
802                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
803                         } else {
804                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
805                         }
806                         return -1;
807                 }
808         }
809
810         if (daemonize){
811                 if (daemon(0, 1) < 0){
812                         XSEGLOG2(&lc, E, "Cannot daemonize");
813                         r = -1;
814                         goto out;
815                 }
816         }
817
818         pidfile_write(pid_fd);
819
820         //TODO perform argument sanity checks
821         verbose = debug_level;
822         if (portno != -1) {
823                 portno_start = portno;
824                 portno_end = portno;
825         }
826
827         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
828         if (!peer){
829                 r = -1;
830                 goto out;
831         }
832         setup_signals(peer);
833         r = custom_peer_init(peer, argc, argv);
834         if (r < 0)
835                 goto out;
836 #ifdef MT
837         //TODO err check
838         peerd_start_threads(peer);
839 #endif
840
841 #ifdef ST_THREADS
842         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
843         r = st_thread_join(st, NULL);
844 #else
845         r = peerd_loop(peer);
846 #endif
847 out:
848         if (pid_fd > 0)
849                 pidfile_remove(pidfile, pid_fd);
850         return r;
851 }