add license headers
[archipelago] / xseg / peers / user / peer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/time.h>
42 #include <signal.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46
47 #ifdef MT
48 #include <pthread.h>
49 #endif
50
51 #include <xseg/xseg.h>
52 #include <peer.h>
53
54 #ifdef MT
55 #define PEER_TYPE "pthread"
56 #else
57 #define PEER_TYPE "posix"
58 #endif
59
60 //FIXME this should not be defined here probably
61 #define MAX_SPEC_LEN 128
62 #define MAX_PIDFILE_LEN 512
63
64 volatile unsigned int terminated = 0;
65 unsigned int verbose = 0;
66 struct log_ctx lc;
67 #ifdef ST_THREADS
68 uint32_t ta = 0;
69 #endif
70
71 #ifdef MT
72 struct peerd *global_peer;
73
74 struct thread {
75         struct peerd *peer;
76         pthread_t tid;
77         pthread_cond_t cond;
78         pthread_mutex_t lock;
79         void (*func)(void *arg);
80         void *arg;
81 };
82
83
84 inline static struct thread* alloc_thread(struct peerd *peer)
85 {
86         xqindex idx = xq_pop_head(&peer->threads, 1);
87         if (idx == Noneidx)
88                 return NULL;
89         return peer->thread + idx;
90 }
91
92 inline static void free_thread(struct peerd *peer, struct thread *t)
93 {
94         xqindex idx = t - peer->thread;
95         xq_append_head(&peer->threads, idx, 1);
96 }
97
98
99 inline static void __wake_up_thread(struct thread *t)
100 {
101         pthread_mutex_lock(&t->lock);
102         pthread_cond_signal(&t->cond);
103         pthread_mutex_unlock(&t->lock);
104 }
105
106 inline static void wake_up_thread(struct thread* t)
107 {
108         if (t){
109                 __wake_up_thread(t);
110         }
111 }
112
113 inline static int wake_up_next_thread(struct peerd *peer)
114 {
115         return (xseg_signal(peer->xseg, peer->portno_start));
116 }
117 #endif
118
119
120 static inline int isTerminate()
121 {
122 /* ta doesn't need to be taken into account, because the main loops
123  * doesn't check the terminated flag if ta is not 0.
124  */
125         /*
126 #ifdef ST_THREADS
127         return (!ta & terminated);
128 #else
129         return terminated;
130 #endif
131         */
132         return terminated;
133 }
134
135 void signal_handler(int signal)
136 {
137         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
138         terminated = 1;
139 #ifdef MT
140         wake_up_next_thread(global_peer);
141 #endif
142 }
143
144 void renew_logfile(int signal)
145 {
146         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
147         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
148 }
149
150 static int setup_signals(struct peerd *peer)
151 {
152         int r;
153         struct sigaction sa;
154 #ifdef MT
155         global_peer = peer;
156 #endif
157         sigemptyset(&sa.sa_mask);
158         sa.sa_flags = 0;
159         sa.sa_handler = signal_handler;
160         r = sigaction(SIGTERM, &sa, NULL);
161         if (r < 0)
162                 return r;
163         r = sigaction(SIGINT, &sa, NULL);
164         if (r < 0)
165                 return r;
166         r = sigaction(SIGQUIT, &sa, NULL);
167         if (r < 0)
168                 return r;
169
170         sa.sa_handler = renew_logfile;
171         r = sigaction(SIGUSR1, &sa, NULL);
172         if (r < 0)
173                 return r;
174
175         return r;
176 }
177
178 inline int canDefer(struct peerd *peer)
179 {
180         return !(peer->defer_portno == NoPort);
181 }
182
183 void print_req(struct xseg *xseg, struct xseg_request *req)
184 {
185         char target[64], data[64];
186         char *req_target, *req_data;
187         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
188         req_target = xseg_get_target(xseg, req);
189         req_data = xseg_get_data(xseg, req);
190
191         if (1) {
192                 strncpy(target, req_target, end);
193                 target[end] = 0;
194                 strncpy(data, req_data, 63);
195                 data[63] = 0;
196                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
197                                 "src: %u, st: %u, dst: %u dt: %u\n"
198                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
199                                 (unsigned long)(req),
200                                 (unsigned int)req->op,
201                                 (unsigned long long)req->offset,
202                                 (unsigned long)req->size,
203                                 (unsigned long)req->serviced,
204                                 (unsigned int)req->state,
205                                 (unsigned int)req->src_portno,
206                                 (unsigned int)req->src_transit_portno,
207                                 (unsigned int)req->dst_portno,
208                                 (unsigned int)req->dst_transit_portno,
209                                 (unsigned int)req->targetlen, target,
210                                 (unsigned long long)req->datalen, data);
211         }
212 }
213 void log_pr(char *msg, struct peer_req *pr)
214 {
215         char target[64], data[64];
216         char *req_target, *req_data;
217         struct peerd *peer = pr->peer;
218         struct xseg *xseg = pr->peer->xseg;
219         req_target = xseg_get_target(xseg, pr->req);
220         req_data = xseg_get_data(xseg, pr->req);
221         /* null terminate name in case of req->target is less than 63 characters,
222          * and next character after name (aka first byte of next buffer) is not
223          * null
224          */
225         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
226         if (verbose) {
227                 strncpy(target, req_target, end);
228                 target[end] = 0;
229                 strncpy(data, req_data, 63);
230                 data[63] = 0;
231                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
232                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
233                                 msg,
234                                 (unsigned int)(pr - peer->peer_reqs),
235                                 (unsigned int)pr->req->op,
236                                 (unsigned long long)pr->req->offset,
237                                 (unsigned long)pr->req->size,
238                                 (unsigned long)pr->req->serviced,
239                                 (unsigned long)pr->retval,
240                                 (unsigned int)pr->req->state,
241                                 (unsigned int)pr->req->targetlen, target,
242                                 (unsigned long long)pr->req->datalen, data);
243         }
244 }
245
246 inline struct peer_req *alloc_peer_req(struct peerd *peer)
247 {
248         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
249         if (idx == Noneidx)
250                 return NULL;
251         return peer->peer_reqs + idx;
252 }
253
254 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
255 {
256         xqindex idx = pr - peer->peer_reqs;
257         pr->req = NULL;
258         xq_append_head(&peer->free_reqs, idx, 1);
259 }
260
261 struct timeval resp_start, resp_end, resp_accum = {0, 0};
262 uint64_t responds = 0;
263 void get_responds_stats(){
264                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
265                                 //(unsigned int)(t - peer->thread),
266                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
267 }
268
269 //FIXME error check
270 void fail(struct peerd *peer, struct peer_req *pr)
271 {
272         struct xseg_request *req = pr->req;
273         uint32_t p;
274         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
275         req->state |= XS_FAILED;
276         //xseg_set_req_data(peer->xseg, pr->req, NULL);
277         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
278         xseg_signal(peer->xseg, p);
279         free_peer_req(peer, pr);
280 #ifdef MT
281         wake_up_next_thread(peer);
282 #endif
283 }
284
285 //FIXME error check
286 void complete(struct peerd *peer, struct peer_req *pr)
287 {
288         struct xseg_request *req = pr->req;
289         uint32_t p;
290         req->state |= XS_SERVED;
291         //xseg_set_req_data(peer->xseg, pr->req, NULL);
292         //gettimeofday(&resp_start, NULL);
293         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
294         //gettimeofday(&resp_end, NULL);
295         //responds++;
296         //timersub(&resp_end, &resp_start, &resp_end);
297         //timeradd(&resp_end, &resp_accum, &resp_accum);
298         //printf("xseg_signal: %u\n", p);
299         xseg_signal(peer->xseg, p);
300         free_peer_req(peer, pr);
301 #ifdef MT
302         wake_up_next_thread(peer);
303 #endif
304 }
305
306 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
307                                 struct xseg_request *req)
308 {
309         struct xseg_request *xreq = pr->req;
310         //assert xreq == req;
311         XSEGLOG2(&lc, D, "Handle accepted");
312         xreq->serviced = 0;
313         //xreq->state = XS_ACCEPTED;
314         pr->retval = 0;
315         dispatch(peer, pr, req, dispatch_accept);
316 }
317
318 static void handle_received(struct peerd *peer, struct peer_req *pr,
319                                 struct xseg_request *req)
320 {
321         //struct xseg_request *req = pr->req;
322         //assert req->state != XS_ACCEPTED;
323         XSEGLOG2(&lc, D, "Handle received \n");
324         dispatch(peer, pr, req, dispatch_receive);
325
326 }
327 struct timeval sub_start, sub_end, sub_accum = {0, 0};
328 uint64_t submits = 0;
329 void get_submits_stats(){
330                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
331                                 //(unsigned int)(t - peer->thread),
332                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
333 }
334
335 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
336 {
337         uint32_t ret;
338         struct xseg_request *req = pr->req;
339         // assert req->portno == peer->portno ?
340         //TODO small function with error checking
341         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
342         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
343         if (ret < 0)
344                 return -1;
345         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
346         //gettimeofday(&sub_start, NULL);
347         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
348         //gettimeofday(&sub_end, NULL);
349         //submits++;
350         //timersub(&sub_end, &sub_start, &sub_end);
351         //timeradd(&sub_end, &sub_accum, &sub_accum);
352         if (ret == NoPort)
353                 return -1;
354         xseg_signal(peer->xseg, ret);
355         return 0;
356 }
357
358 static int check_ports(struct peerd *peer)
359 {
360         struct xseg *xseg = peer->xseg;
361         xport portno_start = peer->portno_start;
362         xport portno_end = peer->portno_end;
363         struct xseg_request *accepted, *received;
364         struct peer_req *pr;
365         xport i;
366         int  r, c = 0;
367
368         for (i = portno_start; i <= portno_end; i++) {
369                 accepted = NULL;
370                 received = NULL;
371                 if (!isTerminate()) {
372                         pr = alloc_peer_req(peer);
373                         if (pr) {
374                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
375                                 if (accepted) {
376                                         pr->req = accepted;
377                                         pr->portno = i;
378                                         xseg_cancel_wait(xseg, i);
379                                         handle_accepted(peer, pr, accepted);
380                                         c = 1;
381                                 }
382                                 else {
383                                         free_peer_req(peer, pr);
384                                 }
385                         }
386                 }
387                 received = xseg_receive(xseg, i, X_NONBLOCK);
388                 if (received) {
389                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
390                         if (r < 0 || !pr){
391                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
392                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
393                                 if (p == NoPort){
394                                         XSEGLOG2(&lc, W, "Could not respond stale request");
395                                         xseg_put_request(xseg, received, portno_start);
396                                         continue;
397                                 } else {
398                                         xseg_signal(xseg, p);
399                                 }
400                         } else {
401                                 //maybe perform sanity check for pr
402                                 xseg_cancel_wait(xseg, i);
403                                 handle_received(peer, pr, received);
404                                 c = 1;
405                         }
406                 }
407         }
408
409         return c;
410 }
411
412 #ifdef MT
413 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
414 {
415         struct thread *t = alloc_thread(peer);
416         if (t) {
417                 t->func = func;
418                 t->arg = arg;
419                 wake_up_thread(t);
420                 return 0;
421         } else
422                 // we could hijack a thread
423                 return -1;
424 }
425
426 static void* thread_loop(void *arg)
427 {
428         struct thread *t = (struct thread *) arg;
429         struct peerd *peer = t->peer;
430         struct xseg *xseg = peer->xseg;
431         xport portno_start = peer->portno_start;
432         xport portno_end = peer->portno_end;
433         pid_t pid =syscall(SYS_gettid);
434         uint64_t loops;
435         uint64_t threshold=1000/(1 + portno_end - portno_start);
436
437         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
438
439         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
440         xseg_init_local_signal(xseg, peer->portno_start);
441         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
442                 if (t->func) {
443                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
444                         xseg_cancel_wait(xseg, peer->portno_start);
445                         t->func(t->arg);
446                         t->func = NULL;
447                         t->arg = NULL;
448                         continue;
449                 }
450
451                 for(loops= threshold; loops > 0; loops--) {
452                         if (loops == 1)
453                                 xseg_prepare_wait(xseg, peer->portno_start);
454                         if (check_ports(peer))
455                                 loops = threshold;
456                 }
457                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
458                 xseg_wait_signal(xseg, 10000000UL);
459                 xseg_cancel_wait(xseg, peer->portno_start);
460                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
461         }
462         wake_up_next_thread(peer);
463         custom_peer_finalize(peer);
464         return NULL;
465 }
466
467 int peerd_start_threads(struct peerd *peer)
468 {
469         int i;
470         uint32_t nr_threads = peer->nr_threads;
471         //TODO err check
472         for (i = 0; i < nr_threads; i++) {
473                 peer->thread[i].peer = peer;
474                 pthread_cond_init(&peer->thread[i].cond,NULL);
475                 pthread_mutex_init(&peer->thread[i].lock, NULL);
476                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
477                 peer->thread[i].func = NULL;
478                 peer->thread[i].arg = NULL;
479
480         }
481         return 0;
482 }
483 #endif
484
485
486 void defer_request(struct peerd *peer, struct peer_req *pr)
487 {
488         // assert canDefer(peer);
489 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
490 //      xseg_signal(peer->xseg, peer->defer_portno);
491 //      free_peer_req(peer, pr);
492 }
493
494 static int peerd_loop(struct peerd *peer) 
495 {
496 #ifdef MT
497         int i;
498         if (peer->interactive_func)
499                 peer->interactive_func();
500         for (i = 0; i < peer->nr_threads; i++) {
501                 pthread_join(peer->thread[i].tid, NULL);
502         }
503 #else
504         struct xseg *xseg = peer->xseg;
505         xport portno_start = peer->portno_start;
506         xport portno_end = peer->portno_end;
507         uint64_t threshold=1000/(1 + portno_end - portno_start);
508         pid_t pid =syscall(SYS_gettid);
509         uint64_t loops;
510
511         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
512         xseg_init_local_signal(xseg, peer->portno_start);
513         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
514                 for(loops= threshold; loops > 0; loops--) {
515                         if (loops == 1)
516                                 xseg_prepare_wait(xseg, peer->portno_start);
517                         if (check_ports(peer))
518                                 loops = threshold;
519                 }
520 #ifdef ST_THREADS
521                 if (ta){
522                         st_sleep(0);
523                 } else {
524 #endif
525                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
526                         xseg_wait_signal(xseg, 10000000UL);
527                         xseg_cancel_wait(xseg, peer->portno_start);
528                         XSEGLOG2(&lc, I, "Peer woke up\n");
529 #ifdef ST_THREADS
530                 }
531 #endif
532         }
533         custom_peer_finalize(peer);
534         xseg_quit_local_signal(xseg, peer->portno_start);
535 #endif
536         return 0;
537 }
538
539 static struct xseg *join(char *spec)
540 {
541         struct xseg_config config;
542         struct xseg *xseg;
543
544         (void)xseg_parse_spec(spec, &config);
545         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
546         if (xseg)
547                 return xseg;
548
549         (void)xseg_create(&config);
550         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
551 }
552
553 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
554                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
555 {
556         int i;
557         struct peerd *peer;
558         struct xseg_port *port;
559
560 #ifdef ST_THREADS
561         st_init();
562 #endif
563         peer = malloc(sizeof(struct peerd));
564         if (!peer) {
565                 perror("malloc");
566                 return NULL;
567         }
568         peer->nr_ops = nr_ops;
569         peer->defer_portno = defer_portno;
570 #ifdef MT
571         peer->nr_threads = nr_threads;
572         peer->thread = calloc(nr_threads, sizeof(struct thread));
573         if (!peer->thread)
574                 goto malloc_fail;
575 #endif
576         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
577         if (!peer->peer_reqs){
578 malloc_fail:
579                 perror("malloc");
580                 return NULL;
581         }
582
583         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
584                 goto malloc_fail;
585 #ifdef MT
586         if (!xq_alloc_empty(&peer->threads, nr_threads))
587                 goto malloc_fail;
588 #endif
589         if (xseg_initialize()){
590                 printf("cannot initialize library\n");
591                 return NULL;
592         }
593         peer->xseg = join(spec);
594         if (!peer->xseg) 
595                 return NULL;
596
597         peer->portno_start = (xport) portno_start;
598         peer->portno_end= (xport) portno_end;
599         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
600         if (!port){
601                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
602                 return NULL;
603         }
604
605         xport p;
606         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
607                 struct xseg_port *tmp;
608                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
609                 if (!tmp){
610                         printf("cannot bind to port %u\n", (unsigned int) p);
611                         return NULL;
612                 }
613         }
614
615         printf("Peer on ports  %u-%u\n", peer->portno_start,
616                         peer->portno_end);
617
618         for (i = 0; i < nr_ops; i++) {
619                 peer->peer_reqs[i].peer = peer;
620                 peer->peer_reqs[i].req = NULL;
621                 peer->peer_reqs[i].retval = 0;
622                 peer->peer_reqs[i].priv = NULL;
623                 peer->peer_reqs[i].portno = NoPort;
624 #ifdef ST_THREADS
625                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
626 #endif
627         }
628 #ifdef MT
629         peer->interactive_func = NULL;
630 #endif
631         return peer;
632 }
633
634 int pidfile_remove(char *path, int fd)
635 {
636         close(fd);
637         return (unlink(path));
638 }
639
640 int pidfile_write(int pid_fd)
641 {
642         char buf[16];
643         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
644         buf[15] = 0;
645
646         lseek(pid_fd, 0, SEEK_SET);
647         int ret = write(pid_fd, buf, strlen(buf));
648         return ret;
649 }
650
651 int pidfile_read(char *path, pid_t *pid)
652 {
653         char buf[16], *endptr;
654         *pid = 0;
655
656         int fd = open(path, O_RDONLY);
657         if (fd < 0)
658                 return -1;
659         int ret = read(fd, buf, 15);
660         buf[15]=0;
661         close(fd);
662         if (ret < 0)
663                 return -1;
664         else{
665                 *pid = strtol(buf, &endptr, 10);
666                 if (endptr != &buf[ret]){
667                         *pid = 0;
668                         return -1;
669                 }
670         }
671         return 0;
672 }
673
674 int pidfile_open(char *path, pid_t *old_pid)
675 {
676         //nfs version > 3
677         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
678         if (fd < 0){
679                 if (errno == EEXIST)
680                         pidfile_read(path, old_pid);
681         }
682         return fd;
683 }
684
685 void usage(char *argv0)
686 {
687         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
688         fprintf(stderr, "General peer options:\n"
689                 "  Option      | Default | \n"
690                 "  --------------------------------------------\n"
691                 "    -g        | None    | Segment spec to join\n"
692                 "    -sp       | NoPort  | Start portno to bind\n"
693                 "    -ep       | NoPort  | End portno to bind\n"
694                 "    -p        | NoPort  | Portno to bind\n"
695                 "    -n        | 16      | Number of ops\n"
696                 "    -v        | 0       | Verbosity level\n"
697                 "    -l        | None    | Logfile \n"
698                 "    -d        | No      | Daemonize \n"
699                 "    --pidfile | None    | Pidfile \n"
700 #ifdef MT
701                 "    -t        | No      | Number of threads \n"
702 #endif
703                 "\n"
704                );
705         custom_peer_usage();
706 }
707
708 int main(int argc, char *argv[])
709 {
710         struct peerd *peer = NULL;
711         //parse args
712         int r;
713         long portno_start = -1, portno_end = -1, portno = -1;
714
715         //set defaults here
716         int daemonize = 0, help = 0;
717         uint32_t nr_ops = 16;
718         uint32_t nr_threads = 1;
719         unsigned int debug_level = 0;
720         uint32_t defer_portno = NoPort;
721         pid_t old_pid;
722         int pid_fd = -1;
723
724         char spec[MAX_SPEC_LEN + 1];
725         char logfile[MAX_LOGFILE_LEN + 1];
726         char pidfile[MAX_PIDFILE_LEN + 1];
727
728         logfile[0] = 0;
729         pidfile[0] = 0;
730         spec[0] = 0;
731
732         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
733         // -dp xseg_portno to defer blocking requests
734         // -l log file ?
735         //TODO print messages on arg parsing error
736         BEGIN_READ_ARGS(argc, argv);
737         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
738         READ_ARG_ULONG("-sp", portno_start);
739         READ_ARG_ULONG("-ep", portno_end);
740         READ_ARG_ULONG("-p", portno);
741         READ_ARG_ULONG("-n", nr_ops);
742         READ_ARG_ULONG("-v", debug_level);
743 #ifdef MT
744         READ_ARG_ULONG("-t", nr_threads);
745 #endif
746 //      READ_ARG_ULONG("-dp", defer_portno);
747         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
748         READ_ARG_BOOL("-d", daemonize);
749         READ_ARG_BOOL("-h", help);
750         READ_ARG_BOOL("--help", help);
751         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
752         END_READ_ARGS();
753
754         if (help){
755                 usage(argv[0]);
756                 return 0;
757         }
758
759         r = init_logctx(&lc, argv[0], debug_level, logfile,
760                         REDIRECT_STDOUT|REDIRECT_STDERR);
761         if (r < 0){
762                 XSEGLOG("Cannot initialize logging to logfile");
763                 return -1;
764         }
765         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
766
767         if (pidfile[0]){
768                 pid_fd = pidfile_open(pidfile, &old_pid);
769                 if (pid_fd < 0) {
770                         if (old_pid) {
771                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
772                         } else {
773                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
774                         }
775                         return -1;
776                 }
777         }
778
779         if (daemonize){
780                 if (daemon(0, 1) < 0){
781                         XSEGLOG2(&lc, E, "Cannot daemonize");
782                         r = -1;
783                         goto out;
784                 }
785         }
786
787         pidfile_write(pid_fd);
788
789         //TODO perform argument sanity checks
790         verbose = debug_level;
791         if (portno != -1) {
792                 portno_start = portno;
793                 portno_end = portno;
794         }
795         if (portno_start == -1 || portno_end == -1){
796                 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
797                 usage(argv[0]);
798                 r = -1;
799                 goto out;
800         }
801
802         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
803         if (!peer){
804                 r = -1;
805                 goto out;
806         }
807         setup_signals(peer);
808         r = custom_peer_init(peer, argc, argv);
809         if (r < 0)
810                 goto out;
811 #ifdef MT
812         //TODO err check
813         peerd_start_threads(peer);
814 #endif
815
816 #ifdef ST_THREADS
817         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
818         r = st_thread_join(st, NULL);
819 #else
820         r = peerd_loop(peer);
821 #endif
822 out:
823         if (pid_fd > 0)
824                 pidfile_remove(pidfile, pid_fd);
825         return r;
826 }