Fix segfault
[archipelago] / xseg / peers / user / peer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/time.h>
42 #include <signal.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46
47 #ifdef MT
48 #include <pthread.h>
49 #endif
50
51 #include <xseg/xseg.h>
52 #include <peer.h>
53
54 #ifdef MT
55 #define PEER_TYPE "pthread"
56 #else
57 #define PEER_TYPE "posix"
58 #endif
59
60 //FIXME this should not be defined here probably
61 #define MAX_SPEC_LEN 128
62 #define MAX_PIDFILE_LEN 512
63
64 volatile unsigned int terminated = 0;
65 unsigned int verbose = 0;
66 struct log_ctx lc;
67 #ifdef ST_THREADS
68 uint32_t ta = 0;
69 #endif
70
71 #ifdef MT
72 struct peerd *global_peer;
73
74 struct thread {
75         struct peerd *peer;
76         pthread_t tid;
77         pthread_cond_t cond;
78         pthread_mutex_t lock;
79         void (*func)(void *arg);
80         void *arg;
81 };
82
83 inline static struct thread* alloc_thread(struct peerd *peer)
84 {
85         xqindex idx = xq_pop_head(&peer->threads, 1);
86         if (idx == Noneidx)
87                 return NULL;
88         return peer->thread + idx;
89 }
90
91 inline static void free_thread(struct peerd *peer, struct thread *t)
92 {
93         xqindex idx = t - peer->thread;
94         xq_append_head(&peer->threads, idx, 1);
95 }
96
97
98 inline static void __wake_up_thread(struct thread *t)
99 {
100         pthread_mutex_lock(&t->lock);
101         pthread_cond_signal(&t->cond);
102         pthread_mutex_unlock(&t->lock);
103 }
104
105 inline static void wake_up_thread(struct thread* t)
106 {
107         if (t){
108                 __wake_up_thread(t);
109         }
110 }
111
112 inline static int wake_up_next_thread(struct peerd *peer)
113 {
114         return (xseg_signal(peer->xseg, peer->portno_start));
115 }
116 #endif
117
118 /*
119  * extern is needed if this function is going to be called by another file
120  * such as bench-xseg.c
121  */
122
123 void signal_handler(int signal)
124 {
125         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
126         terminated = 1;
127 #ifdef MT
128         wake_up_next_thread(global_peer);
129 #endif
130 }
131
132 void renew_logfile(int signal)
133 {
134         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
135         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
136 }
137
138 static int setup_signals(struct peerd *peer)
139 {
140         int r;
141         struct sigaction sa;
142 #ifdef MT
143         global_peer = peer;
144 #endif
145         sigemptyset(&sa.sa_mask);
146         sa.sa_flags = 0;
147         sa.sa_handler = signal_handler;
148         r = sigaction(SIGTERM, &sa, NULL);
149         if (r < 0)
150                 return r;
151         r = sigaction(SIGINT, &sa, NULL);
152         if (r < 0)
153                 return r;
154         r = sigaction(SIGQUIT, &sa, NULL);
155         if (r < 0)
156                 return r;
157
158         sa.sa_handler = renew_logfile;
159         r = sigaction(SIGUSR1, &sa, NULL);
160         if (r < 0)
161                 return r;
162
163         return r;
164 }
165
166 inline int canDefer(struct peerd *peer)
167 {
168         return !(peer->defer_portno == NoPort);
169 }
170
171 void print_req(struct xseg *xseg, struct xseg_request *req)
172 {
173         char target[64], data[64];
174         char *req_target, *req_data;
175         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
176         req_target = xseg_get_target(xseg, req);
177         req_data = xseg_get_data(xseg, req);
178
179         if (1) {
180                 strncpy(target, req_target, end);
181                 target[end] = 0;
182                 strncpy(data, req_data, 63);
183                 data[63] = 0;
184                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
185                                 "src: %u, transit: %u, dst: %u effective dst: %u\n"
186                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
187                                 (unsigned long)(req),
188                                 (unsigned int)req->op,
189                                 (unsigned long long)req->offset,
190                                 (unsigned long)req->size,
191                                 (unsigned long)req->serviced,
192                                 (unsigned int)req->state,
193                                 (unsigned int)req->src_portno,
194                                 (unsigned int)req->transit_portno,
195                                 (unsigned int)req->dst_portno,
196                                 (unsigned int)req->effective_dst_portno,
197                                 (unsigned int)req->targetlen, target,
198                                 (unsigned long long)req->datalen, data);
199         }
200 }
201 void log_pr(char *msg, struct peer_req *pr)
202 {
203         char target[64], data[64];
204         char *req_target, *req_data;
205         struct peerd *peer = pr->peer;
206         struct xseg *xseg = pr->peer->xseg;
207         req_target = xseg_get_target(xseg, pr->req);
208         req_data = xseg_get_data(xseg, pr->req);
209         /* null terminate name in case of req->target is less than 63 characters,
210          * and next character after name (aka first byte of next buffer) is not
211          * null
212          */
213         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
214         if (verbose) {
215                 strncpy(target, req_target, end);
216                 target[end] = 0;
217                 strncpy(data, req_data, 63);
218                 data[63] = 0;
219                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
220                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
221                                 msg,
222                                 (unsigned int)(pr - peer->peer_reqs),
223                                 (unsigned int)pr->req->op,
224                                 (unsigned long long)pr->req->offset,
225                                 (unsigned long)pr->req->size,
226                                 (unsigned long)pr->req->serviced,
227                                 (unsigned long)pr->retval,
228                                 (unsigned int)pr->req->state,
229                                 (unsigned int)pr->req->targetlen, target,
230                                 (unsigned long long)pr->req->datalen, data);
231         }
232 }
233
234 inline struct peer_req *alloc_peer_req(struct peerd *peer)
235 {
236         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
237         if (idx == Noneidx)
238                 return NULL;
239         return peer->peer_reqs + idx;
240 }
241
242 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
243 {
244         xqindex idx = pr - peer->peer_reqs;
245         pr->req = NULL;
246         xq_append_head(&peer->free_reqs, idx, 1);
247 }
248
249 struct timeval resp_start, resp_end, resp_accum = {0, 0};
250 uint64_t responds = 0;
251 void get_responds_stats(){
252                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
253                                 //(unsigned int)(t - peer->thread),
254                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
255 }
256
257 //FIXME error check
258 void fail(struct peerd *peer, struct peer_req *pr)
259 {
260         struct xseg_request *req = pr->req;
261         uint32_t p;
262         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
263         req->state |= XS_FAILED;
264         //xseg_set_req_data(peer->xseg, pr->req, NULL);
265         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
266         xseg_signal(peer->xseg, p);
267         free_peer_req(peer, pr);
268 #ifdef MT
269         wake_up_next_thread(peer);
270 #endif
271 }
272
273 //FIXME error check
274 void complete(struct peerd *peer, struct peer_req *pr)
275 {
276         struct xseg_request *req = pr->req;
277         uint32_t p;
278         req->state |= XS_SERVED;
279         //xseg_set_req_data(peer->xseg, pr->req, NULL);
280         //gettimeofday(&resp_start, NULL);
281         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
282         //gettimeofday(&resp_end, NULL);
283         //responds++;
284         //timersub(&resp_end, &resp_start, &resp_end);
285         //timeradd(&resp_end, &resp_accum, &resp_accum);
286         //printf("xseg_signal: %u\n", p);
287         xseg_signal(peer->xseg, p);
288         free_peer_req(peer, pr);
289 #ifdef MT
290         wake_up_next_thread(peer);
291 #endif
292 }
293
294 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
295                                 struct xseg_request *req)
296 {
297         struct xseg_request *xreq = pr->req;
298         //assert xreq == req;
299         XSEGLOG2(&lc, D, "Handle accepted");
300         xreq->serviced = 0;
301         //xreq->state = XS_ACCEPTED;
302         pr->retval = 0;
303         dispatch(peer, pr, req, dispatch_accept);
304 }
305
306 static void handle_received(struct peerd *peer, struct peer_req *pr,
307                                 struct xseg_request *req)
308 {
309         //struct xseg_request *req = pr->req;
310         //assert req->state != XS_ACCEPTED;
311         XSEGLOG2(&lc, D, "Handle received \n");
312         dispatch(peer, pr, req, dispatch_receive);
313
314 }
315 struct timeval sub_start, sub_end, sub_accum = {0, 0};
316 uint64_t submits = 0;
317 void get_submits_stats(){
318                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
319                                 //(unsigned int)(t - peer->thread),
320                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
321 }
322
323 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
324 {
325         uint32_t ret;
326         struct xseg_request *req = pr->req;
327         // assert req->portno == peer->portno ?
328         //TODO small function with error checking
329         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
330         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
331         if (ret < 0)
332                 return -1;
333         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
334         //gettimeofday(&sub_start, NULL);
335         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
336         //gettimeofday(&sub_end, NULL);
337         //submits++;
338         //timersub(&sub_end, &sub_start, &sub_end);
339         //timeradd(&sub_end, &sub_accum, &sub_accum);
340         if (ret == NoPort)
341                 return -1;
342         xseg_signal(peer->xseg, ret);
343         return 0;
344 }
345
346 int check_ports(struct peerd *peer)
347 {
348         struct xseg *xseg = peer->xseg;
349         xport portno_start = peer->portno_start;
350         xport portno_end = peer->portno_end;
351         struct xseg_request *accepted, *received;
352         struct peer_req *pr;
353         xport i;
354         int  r, c = 0;
355
356         for (i = portno_start; i <= portno_end; i++) {
357                 accepted = NULL;
358                 received = NULL;
359                 if (!isTerminate()) {
360                         pr = alloc_peer_req(peer);
361                         if (pr) {
362                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
363                                 if (accepted) {
364                                         pr->req = accepted;
365                                         pr->portno = i;
366                                         xseg_cancel_wait(xseg, i);
367                                         handle_accepted(peer, pr, accepted);
368                                         c = 1;
369                                 }
370                                 else {
371                                         free_peer_req(peer, pr);
372                                 }
373                         }
374                 }
375                 received = xseg_receive(xseg, i, X_NONBLOCK);
376                 if (received) {
377                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
378                         if (r < 0 || !pr){
379                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
380                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
381                                 if (p == NoPort){
382                                         XSEGLOG2(&lc, W, "Could not respond stale request");
383                                         xseg_put_request(xseg, received, portno_start);
384                                         continue;
385                                 } else {
386                                         xseg_signal(xseg, p);
387                                 }
388                         } else {
389                                 //maybe perform sanity check for pr
390                                 xseg_cancel_wait(xseg, i);
391                                 handle_received(peer, pr, received);
392                                 c = 1;
393                         }
394                 }
395         }
396
397         return c;
398 }
399
400 #ifdef MT
401 static int peerd_loop(void *arg);
402
403 static void* thread_loop(void *arg)
404 {
405         struct thread *t = (struct thread *) arg;
406         struct peerd *peer = t->peer;
407         struct xseg *xseg = peer->xseg;
408         xport portno_start = peer->portno_start;
409         xport portno_end = peer->portno_end;
410         pid_t pid =syscall(SYS_gettid);
411         uint64_t loops;
412         uint64_t threshold=1000/(1 + portno_end - portno_start);
413
414         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
415         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
416         xseg_init_local_signal(xseg, peer->portno_start);
417         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
418                 for(loops =  threshold; loops > 0; loops--) {
419                         if (loops == 1)
420                                 xseg_prepare_wait(xseg, peer->portno_start);
421                         if (check_ports(peer))
422                                 loops = threshold;
423                 }
424                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
425                 xseg_wait_signal(xseg, 10000000UL);
426                 xseg_cancel_wait(xseg, peer->portno_start);
427                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
428         }
429         wake_up_next_thread(peer);
430         custom_peer_finalize(peer);
431         return NULL;
432 }
433
434 int peerd_start_threads(struct peerd *peer)
435 {
436         int i;
437         uint32_t nr_threads = peer->nr_threads;
438         //TODO err check
439         for (i = 0; i < nr_threads; i++) {
440                 peer->thread[i].peer = peer;
441                 pthread_cond_init(&peer->thread[i].cond,NULL);
442                 pthread_mutex_init(&peer->thread[i].lock, NULL);
443                 if (peer->custom_peerd_loop)
444                         pthread_create(&peer->thread[i].tid, NULL, peer->custom_peerd_loop, (void *)(peer->thread + i));
445                 else
446                         pthread_create(&peer->thread[i].tid, NULL, peerd_loop, (void *)(peer->thread + i));
447                 peer->thread[i].func = NULL;
448                 peer->thread[i].arg = NULL;
449         }
450
451         if (peer->interactive_func)
452                 peer->interactive_func();
453         for (i = 0; i < nr_threads; i++) {
454                 pthread_join(peer->thread[i].tid, NULL);
455         }
456
457         return 0;
458 }
459 #endif
460
461
462 int defer_request(struct peerd *peer, struct peer_req *pr)
463 {
464         int r;
465         xport p;
466         if (!canDefer(peer)){
467                 XSEGLOG2(&lc, E, "Peer cannot defer requests");
468                 return -1;
469         }
470         p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
471                         X_ALLOC);
472         if (p == NoPort){
473                 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
474                 return -1;
475         }
476         r = xseg_signal(peer->xseg, p);
477         if (r < 0) {
478                 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
479         }
480         free_peer_req(peer, pr);
481         return 0;
482 }
483
484 static int peerd_loop(void *arg)
485 {
486 #ifdef MT
487         struct thread *t = (struct thread *) arg;
488         struct peerd *peer = t->peer;
489 #else
490         struct peerd *peer = (struct peerd *) arg;
491 #endif
492         struct xseg *xseg = peer->xseg;
493         xport portno_start = peer->portno_start;
494         xport portno_end = peer->portno_end;
495         pid_t pid =syscall(SYS_gettid);
496         uint64_t threshold=1000/(1 + portno_end - portno_start);
497         uint64_t loops;
498
499         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
500         xseg_init_local_signal(xseg, peer->portno_start);
501         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
502                 for(loops= threshold; loops > 0; loops--) {
503                         if (loops == 1)
504                                 xseg_prepare_wait(xseg, peer->portno_start);
505                         if (check_ports(peer))
506                                 loops = threshold;
507                 }
508 #ifdef ST_THREADS
509                 if (ta){
510                         st_sleep(0);
511                         continue;
512                 }
513 #endif
514                 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
515                 xseg_wait_signal(xseg, 10000000UL);
516                 xseg_cancel_wait(xseg, peer->portno_start);
517                 XSEGLOG2(&lc, I, "Peer woke up\n");
518         }
519 #ifdef MT
520         wake_up_next_thread(peer);
521         custom_peer_finalize(peer);
522 #else
523         custom_peer_finalize(peer);
524         xseg_quit_local_signal(xseg, peer->portno_start);
525 #endif
526         return 0;
527 }
528
529 static struct xseg *join(char *spec)
530 {
531         struct xseg_config config;
532         struct xseg *xseg;
533
534         (void)xseg_parse_spec(spec, &config);
535         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
536         if (xseg)
537                 return xseg;
538
539         (void)xseg_create(&config);
540         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
541 }
542
543 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
544                         long portno_end, uint32_t nr_threads, xport defer_portno)
545 {
546         int i;
547         struct peerd *peer;
548         struct xseg_port *port;
549
550 #ifdef ST_THREADS
551         st_init();
552 #endif
553         peer = malloc(sizeof(struct peerd));
554         if (!peer) {
555                 perror("malloc");
556                 return NULL;
557         }
558         peer->nr_ops = nr_ops;
559         peer->defer_portno = defer_portno;
560 #ifdef MT
561         peer->nr_threads = nr_threads;
562         peer->thread = calloc(nr_threads, sizeof(struct thread));
563         if (!peer->thread)
564                 goto malloc_fail;
565 #endif
566         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
567         if (!peer->peer_reqs){
568 malloc_fail:
569                 perror("malloc");
570                 return NULL;
571         }
572
573         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
574                 goto malloc_fail;
575 #ifdef MT
576         if (!xq_alloc_empty(&peer->threads, nr_threads))
577                 goto malloc_fail;
578 #endif
579         if (xseg_initialize()){
580                 printf("cannot initialize library\n");
581                 return NULL;
582         }
583         peer->xseg = join(spec);
584         if (!peer->xseg)
585                 return NULL;
586
587         peer->portno_start = (xport) portno_start;
588         peer->portno_end= (xport) portno_end;
589         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
590         if (!port){
591                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
592                 return NULL;
593         }
594
595         xport p;
596         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
597                 struct xseg_port *tmp;
598                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
599                 if (!tmp){
600                         printf("cannot bind to port %u\n", (unsigned int) p);
601                         return NULL;
602                 }
603         }
604
605         printf("Peer on ports  %u-%u\n", peer->portno_start,
606                         peer->portno_end);
607
608         for (i = 0; i < nr_ops; i++) {
609                 peer->peer_reqs[i].peer = peer;
610                 peer->peer_reqs[i].req = NULL;
611                 peer->peer_reqs[i].retval = 0;
612                 peer->peer_reqs[i].priv = NULL;
613                 peer->peer_reqs[i].portno = NoPort;
614 #ifdef ST_THREADS
615                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
616 #endif
617         }
618 #ifdef MT
619         peer->interactive_func = NULL;
620 #endif
621         return peer;
622 }
623
624 int pidfile_remove(char *path, int fd)
625 {
626         close(fd);
627         return (unlink(path));
628 }
629
630 int pidfile_write(int pid_fd)
631 {
632         char buf[16];
633         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
634         buf[15] = 0;
635
636         lseek(pid_fd, 0, SEEK_SET);
637         int ret = write(pid_fd, buf, strlen(buf));
638         return ret;
639 }
640
641 int pidfile_read(char *path, pid_t *pid)
642 {
643         char buf[16], *endptr;
644         *pid = 0;
645
646         int fd = open(path, O_RDONLY);
647         if (fd < 0)
648                 return -1;
649         int ret = read(fd, buf, 15);
650         buf[15]=0;
651         close(fd);
652         if (ret < 0)
653                 return -1;
654         else{
655                 *pid = strtol(buf, &endptr, 10);
656                 if (endptr != &buf[ret]){
657                         *pid = 0;
658                         return -1;
659                 }
660         }
661         return 0;
662 }
663
664 int pidfile_open(char *path, pid_t *old_pid)
665 {
666         //nfs version > 3
667         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
668         if (fd < 0){
669                 if (errno == EEXIST)
670                         pidfile_read(path, old_pid);
671         }
672         return fd;
673 }
674
675 void usage(char *argv0)
676 {
677         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
678         fprintf(stderr, "General peer options:\n"
679                 "  Option      | Default | \n"
680                 "  --------------------------------------------\n"
681                 "    -g        | None    | Segment spec to join\n"
682                 "    -sp       | NoPort  | Start portno to bind\n"
683                 "    -ep       | NoPort  | End portno to bind\n"
684                 "    -p        | NoPort  | Portno to bind\n"
685                 "    -n        | 16      | Number of ops\n"
686                 "    -v        | 0       | Verbosity level\n"
687                 "    -l        | None    | Logfile \n"
688                 "    -d        | No      | Daemonize \n"
689                 "    --pidfile | None    | Pidfile \n"
690 #ifdef MT
691                 "    -t        | No      | Number of threads \n"
692 #endif
693                 "\n"
694                );
695         custom_peer_usage();
696 }
697
698 int main(int argc, char *argv[])
699 {
700         struct peerd *peer = NULL;
701         //parse args
702         int r;
703         long portno_start = -1, portno_end = -1, portno = -1;
704
705         //set defaults here
706         int daemonize = 0, help = 0;
707         uint32_t nr_ops = 16;
708         uint32_t nr_threads = 1;
709         unsigned int debug_level = 0;
710         xport defer_portno = NoPort;
711         pid_t old_pid;
712         int pid_fd = -1;
713
714         char spec[MAX_SPEC_LEN + 1];
715         char logfile[MAX_LOGFILE_LEN + 1];
716         char pidfile[MAX_PIDFILE_LEN + 1];
717
718         logfile[0] = 0;
719         pidfile[0] = 0;
720         spec[0] = 0;
721
722         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
723         // -dp xseg_portno to defer blocking requests
724         // -l log file ?
725         //TODO print messages on arg parsing error
726         BEGIN_READ_ARGS(argc, argv);
727         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
728         READ_ARG_ULONG("-sp", portno_start);
729         READ_ARG_ULONG("-ep", portno_end);
730         READ_ARG_ULONG("-p", portno);
731         READ_ARG_ULONG("-n", nr_ops);
732         READ_ARG_ULONG("-v", debug_level);
733 #ifdef MT
734         READ_ARG_ULONG("-t", nr_threads);
735 #endif
736         READ_ARG_ULONG("-dp", defer_portno);
737         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
738         READ_ARG_BOOL("-d", daemonize);
739         READ_ARG_BOOL("-h", help);
740         READ_ARG_BOOL("--help", help);
741         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
742         END_READ_ARGS();
743
744         if (help){
745                 usage(argv[0]);
746                 return 0;
747         }
748
749         r = init_logctx(&lc, argv[0], debug_level, logfile,
750                         REDIRECT_STDOUT|REDIRECT_STDERR);
751         if (r < 0){
752                 XSEGLOG("Cannot initialize logging to logfile");
753                 return -1;
754         }
755         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
756
757         if (pidfile[0]){
758                 pid_fd = pidfile_open(pidfile, &old_pid);
759                 if (pid_fd < 0) {
760                         if (old_pid) {
761                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
762                         } else {
763                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
764                         }
765                         return -1;
766                 }
767         }
768
769         if (daemonize){
770                 if (daemon(0, 1) < 0){
771                         XSEGLOG2(&lc, E, "Cannot daemonize");
772                         r = -1;
773                         goto out;
774                 }
775         }
776
777         pidfile_write(pid_fd);
778
779         //TODO perform argument sanity checks
780         verbose = debug_level;
781         if (portno != -1) {
782                 portno_start = portno;
783                 portno_end = portno;
784         }
785         if (portno_start == -1 || portno_end == -1){
786                 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
787                 usage(argv[0]);
788                 r = -1;
789                 goto out;
790         }
791
792         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
793         if (!peer){
794                 r = -1;
795                 goto out;
796         }
797         setup_signals(peer);
798         r = custom_peer_init(peer, argc, argv);
799         if (r < 0)
800                 goto out;
801 #if defined(MT)
802         //TODO err check
803         peerd_start_threads(peer);
804 #elif defined(ST_THREADS)
805         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
806         r = st_thread_join(st, NULL);
807 #else
808         if (peer->custom_peerd_loop)
809                 r = peer->custom_peerd_loop(peer);
810         else
811                 r = peerd_loop(peer);
812 #endif
813 out:
814         if (pid_fd > 0)
815                 pidfile_remove(pidfile, pid_fd);
816         return r;
817 }