Unify thread loop with peerd_loop
[archipelago] / xseg / peers / user / peer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/time.h>
42 #include <signal.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46
47 #ifdef MT
48 #include <pthread.h>
49 #endif
50
51 #include <xseg/xseg.h>
52 #include <peer.h>
53
54 #ifdef MT
55 #define PEER_TYPE "pthread"
56 #else
57 #define PEER_TYPE "posix"
58 #endif
59
60 //FIXME this should not be defined here probably
61 #define MAX_SPEC_LEN 128
62 #define MAX_PIDFILE_LEN 512
63
64 volatile unsigned int terminated = 0;
65 unsigned int verbose = 0;
66 struct log_ctx lc;
67 #ifdef ST_THREADS
68 uint32_t ta = 0;
69 #endif
70
71 #ifdef MT
72 struct peerd *global_peer;
73
74 struct thread {
75         struct peerd *peer;
76         pthread_t tid;
77         pthread_cond_t cond;
78         pthread_mutex_t lock;
79         void (*func)(void *arg);
80         void *arg;
81 };
82
83 inline static struct thread* alloc_thread(struct peerd *peer)
84 {
85         xqindex idx = xq_pop_head(&peer->threads, 1);
86         if (idx == Noneidx)
87                 return NULL;
88         return peer->thread + idx;
89 }
90
91 inline static void free_thread(struct peerd *peer, struct thread *t)
92 {
93         xqindex idx = t - peer->thread;
94         xq_append_head(&peer->threads, idx, 1);
95 }
96
97
98 inline static void __wake_up_thread(struct thread *t)
99 {
100         pthread_mutex_lock(&t->lock);
101         pthread_cond_signal(&t->cond);
102         pthread_mutex_unlock(&t->lock);
103 }
104
105 inline static void wake_up_thread(struct thread* t)
106 {
107         if (t){
108                 __wake_up_thread(t);
109         }
110 }
111
112 inline static int wake_up_next_thread(struct peerd *peer)
113 {
114         return (xseg_signal(peer->xseg, peer->portno_start));
115 }
116 #endif
117
118 /*
119  * extern is needed if this function is going to be called by another file
120  * such as bench-xseg.c
121  */
122
123 void signal_handler(int signal)
124 {
125         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
126         terminated = 1;
127 #ifdef MT
128         wake_up_next_thread(global_peer);
129 #endif
130 }
131
132 void renew_logfile(int signal)
133 {
134         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
135         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
136 }
137
138 static int setup_signals(struct peerd *peer)
139 {
140         int r;
141         struct sigaction sa;
142 #ifdef MT
143         global_peer = peer;
144 #endif
145         sigemptyset(&sa.sa_mask);
146         sa.sa_flags = 0;
147         sa.sa_handler = signal_handler;
148         r = sigaction(SIGTERM, &sa, NULL);
149         if (r < 0)
150                 return r;
151         r = sigaction(SIGINT, &sa, NULL);
152         if (r < 0)
153                 return r;
154         r = sigaction(SIGQUIT, &sa, NULL);
155         if (r < 0)
156                 return r;
157
158         sa.sa_handler = renew_logfile;
159         r = sigaction(SIGUSR1, &sa, NULL);
160         if (r < 0)
161                 return r;
162
163         return r;
164 }
165
166 inline int canDefer(struct peerd *peer)
167 {
168         return !(peer->defer_portno == NoPort);
169 }
170
171 void print_req(struct xseg *xseg, struct xseg_request *req)
172 {
173         char target[64], data[64];
174         char *req_target, *req_data;
175         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
176         req_target = xseg_get_target(xseg, req);
177         req_data = xseg_get_data(xseg, req);
178
179         if (1) {
180                 strncpy(target, req_target, end);
181                 target[end] = 0;
182                 strncpy(data, req_data, 63);
183                 data[63] = 0;
184                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
185                                 "src: %u, transit: %u, dst: %u effective dst: %u\n"
186                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
187                                 (unsigned long)(req),
188                                 (unsigned int)req->op,
189                                 (unsigned long long)req->offset,
190                                 (unsigned long)req->size,
191                                 (unsigned long)req->serviced,
192                                 (unsigned int)req->state,
193                                 (unsigned int)req->src_portno,
194                                 (unsigned int)req->transit_portno,
195                                 (unsigned int)req->dst_portno,
196                                 (unsigned int)req->effective_dst_portno,
197                                 (unsigned int)req->targetlen, target,
198                                 (unsigned long long)req->datalen, data);
199         }
200 }
201 void log_pr(char *msg, struct peer_req *pr)
202 {
203         char target[64], data[64];
204         char *req_target, *req_data;
205         struct peerd *peer = pr->peer;
206         struct xseg *xseg = pr->peer->xseg;
207         req_target = xseg_get_target(xseg, pr->req);
208         req_data = xseg_get_data(xseg, pr->req);
209         /* null terminate name in case of req->target is less than 63 characters,
210          * and next character after name (aka first byte of next buffer) is not
211          * null
212          */
213         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
214         if (verbose) {
215                 strncpy(target, req_target, end);
216                 target[end] = 0;
217                 strncpy(data, req_data, 63);
218                 data[63] = 0;
219                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
220                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
221                                 msg,
222                                 (unsigned int)(pr - peer->peer_reqs),
223                                 (unsigned int)pr->req->op,
224                                 (unsigned long long)pr->req->offset,
225                                 (unsigned long)pr->req->size,
226                                 (unsigned long)pr->req->serviced,
227                                 (unsigned long)pr->retval,
228                                 (unsigned int)pr->req->state,
229                                 (unsigned int)pr->req->targetlen, target,
230                                 (unsigned long long)pr->req->datalen, data);
231         }
232 }
233
234 inline struct peer_req *alloc_peer_req(struct peerd *peer)
235 {
236         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
237         if (idx == Noneidx)
238                 return NULL;
239         return peer->peer_reqs + idx;
240 }
241
242 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
243 {
244         xqindex idx = pr - peer->peer_reqs;
245         pr->req = NULL;
246         xq_append_head(&peer->free_reqs, idx, 1);
247 }
248
249 struct timeval resp_start, resp_end, resp_accum = {0, 0};
250 uint64_t responds = 0;
251 void get_responds_stats(){
252                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
253                                 //(unsigned int)(t - peer->thread),
254                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
255 }
256
257 //FIXME error check
258 void fail(struct peerd *peer, struct peer_req *pr)
259 {
260         struct xseg_request *req = pr->req;
261         uint32_t p;
262         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
263         req->state |= XS_FAILED;
264         //xseg_set_req_data(peer->xseg, pr->req, NULL);
265         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
266         xseg_signal(peer->xseg, p);
267         free_peer_req(peer, pr);
268 #ifdef MT
269         wake_up_next_thread(peer);
270 #endif
271 }
272
273 //FIXME error check
274 void complete(struct peerd *peer, struct peer_req *pr)
275 {
276         struct xseg_request *req = pr->req;
277         uint32_t p;
278         req->state |= XS_SERVED;
279         //xseg_set_req_data(peer->xseg, pr->req, NULL);
280         //gettimeofday(&resp_start, NULL);
281         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
282         //gettimeofday(&resp_end, NULL);
283         //responds++;
284         //timersub(&resp_end, &resp_start, &resp_end);
285         //timeradd(&resp_end, &resp_accum, &resp_accum);
286         //printf("xseg_signal: %u\n", p);
287         xseg_signal(peer->xseg, p);
288         free_peer_req(peer, pr);
289 #ifdef MT
290         wake_up_next_thread(peer);
291 #endif
292 }
293
294 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
295                                 struct xseg_request *req)
296 {
297         struct xseg_request *xreq = pr->req;
298         //assert xreq == req;
299         XSEGLOG2(&lc, D, "Handle accepted");
300         xreq->serviced = 0;
301         //xreq->state = XS_ACCEPTED;
302         pr->retval = 0;
303         dispatch(peer, pr, req, dispatch_accept);
304 }
305
306 static void handle_received(struct peerd *peer, struct peer_req *pr,
307                                 struct xseg_request *req)
308 {
309         //struct xseg_request *req = pr->req;
310         //assert req->state != XS_ACCEPTED;
311         XSEGLOG2(&lc, D, "Handle received \n");
312         dispatch(peer, pr, req, dispatch_receive);
313
314 }
315 struct timeval sub_start, sub_end, sub_accum = {0, 0};
316 uint64_t submits = 0;
317 void get_submits_stats(){
318                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
319                                 //(unsigned int)(t - peer->thread),
320                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
321 }
322
323 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
324 {
325         uint32_t ret;
326         struct xseg_request *req = pr->req;
327         // assert req->portno == peer->portno ?
328         //TODO small function with error checking
329         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
330         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
331         if (ret < 0)
332                 return -1;
333         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
334         //gettimeofday(&sub_start, NULL);
335         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
336         //gettimeofday(&sub_end, NULL);
337         //submits++;
338         //timersub(&sub_end, &sub_start, &sub_end);
339         //timeradd(&sub_end, &sub_accum, &sub_accum);
340         if (ret == NoPort)
341                 return -1;
342         xseg_signal(peer->xseg, ret);
343         return 0;
344 }
345
346 int check_ports(struct peerd *peer)
347 {
348         struct xseg *xseg = peer->xseg;
349         xport portno_start = peer->portno_start;
350         xport portno_end = peer->portno_end;
351         struct xseg_request *accepted, *received;
352         struct peer_req *pr;
353         xport i;
354         int  r, c = 0;
355
356         for (i = portno_start; i <= portno_end; i++) {
357                 accepted = NULL;
358                 received = NULL;
359                 if (!isTerminate()) {
360                         pr = alloc_peer_req(peer);
361                         if (pr) {
362                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
363                                 if (accepted) {
364                                         pr->req = accepted;
365                                         pr->portno = i;
366                                         xseg_cancel_wait(xseg, i);
367                                         handle_accepted(peer, pr, accepted);
368                                         c = 1;
369                                 }
370                                 else {
371                                         free_peer_req(peer, pr);
372                                 }
373                         }
374                 }
375                 received = xseg_receive(xseg, i, X_NONBLOCK);
376                 if (received) {
377                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
378                         if (r < 0 || !pr){
379                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
380                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
381                                 if (p == NoPort){
382                                         XSEGLOG2(&lc, W, "Could not respond stale request");
383                                         xseg_put_request(xseg, received, portno_start);
384                                         continue;
385                                 } else {
386                                         xseg_signal(xseg, p);
387                                 }
388                         } else {
389                                 //maybe perform sanity check for pr
390                                 xseg_cancel_wait(xseg, i);
391                                 handle_received(peer, pr, received);
392                                 c = 1;
393                         }
394                 }
395         }
396
397         return c;
398 }
399
400 #ifdef MT
401 static int peerd_loop(void *arg);
402
403 static void* thread_loop(void *arg)
404 {
405         struct thread *t = (struct thread *) arg;
406         struct peerd *peer = t->peer;
407         struct xseg *xseg = peer->xseg;
408         xport portno_start = peer->portno_start;
409         xport portno_end = peer->portno_end;
410         pid_t pid =syscall(SYS_gettid);
411         uint64_t loops;
412         uint64_t threshold=1000/(1 + portno_end - portno_start);
413
414         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
415         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
416         xseg_init_local_signal(xseg, peer->portno_start);
417         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
418                 for(loops =  threshold; loops > 0; loops--) {
419                         if (loops == 1)
420                                 xseg_prepare_wait(xseg, peer->portno_start);
421                         if (check_ports(peer))
422                                 loops = threshold;
423                 }
424                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
425                 xseg_wait_signal(xseg, 10000000UL);
426                 xseg_cancel_wait(xseg, peer->portno_start);
427                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
428         }
429         wake_up_next_thread(peer);
430         custom_peer_finalize(peer);
431         return NULL;
432 }
433
434 int peerd_start_threads(struct peerd *peer)
435 {
436         int i;
437         uint32_t nr_threads = peer->nr_threads;
438         //TODO err check
439         for (i = 0; i < nr_threads; i++) {
440                 peer->thread[i].peer = peer;
441                 pthread_cond_init(&peer->thread[i].cond,NULL);
442                 pthread_mutex_init(&peer->thread[i].lock, NULL);
443                 if (peer->custom_peerd_loop)
444                         pthread_create(&peer->thread[i].tid, NULL, peer->custom_peerd_loop, (void *)(peer->thread + i));
445                 else
446                         pthread_create(&peer->thread[i].tid, NULL, peerd_loop, (void *)(peer->thread + i));
447                 peer->thread[i].func = NULL;
448                 peer->thread[i].arg = NULL;
449         }
450
451         for (i = 0; i < nr_threads; i++) {
452                 pthread_join(peer->thread[i].tid, NULL);
453         }
454         //?: Is this re-ordering acceptable?
455         if (peer->interactive_func)
456                 peer->interactive_func();
457
458         return 0;
459 }
460 #endif
461
462
463 int defer_request(struct peerd *peer, struct peer_req *pr)
464 {
465         int r;
466         xport p;
467         if (!canDefer(peer)){
468                 XSEGLOG2(&lc, E, "Peer cannot defer requests");
469                 return -1;
470         }
471         p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
472                         X_ALLOC);
473         if (p == NoPort){
474                 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
475                 return -1;
476         }
477         r = xseg_signal(peer->xseg, p);
478         if (r < 0) {
479                 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
480         }
481         free_peer_req(peer, pr);
482         return 0;
483 }
484
485 static int peerd_loop(void *arg)
486 {
487 #ifdef MT
488         struct thread *t = (struct thread *) arg;
489         struct peerd *peer = t->peer;
490 #else
491         struct peerd *peer = (struct peerd *) arg;
492 #endif
493         struct xseg *xseg = peer->xseg;
494         xport portno_start = peer->portno_start;
495         xport portno_end = peer->portno_end;
496         pid_t pid =syscall(SYS_gettid);
497         uint64_t threshold=1000/(1 + portno_end - portno_start);
498         uint64_t loops;
499
500         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
501         xseg_init_local_signal(xseg, peer->portno_start);
502         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
503                 for(loops= threshold; loops > 0; loops--) {
504                         if (loops == 1)
505                                 xseg_prepare_wait(xseg, peer->portno_start);
506                         if (check_ports(peer))
507                                 loops = threshold;
508                 }
509 #ifdef ST_THREADS
510                 if (ta){
511                         st_sleep(0);
512                         continue;
513                 }
514 #endif
515                 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
516                 xseg_wait_signal(xseg, 10000000UL);
517                 xseg_cancel_wait(xseg, peer->portno_start);
518                 XSEGLOG2(&lc, I, "Peer woke up\n");
519         }
520 #ifdef MT
521         wake_up_next_thread(peer);
522         custom_peer_finalize(peer);
523 #else
524         custom_peer_finalize(peer);
525         xseg_quit_local_signal(xseg, peer->portno_start);
526 #endif
527         return 0;
528 }
529
530 static struct xseg *join(char *spec)
531 {
532         struct xseg_config config;
533         struct xseg *xseg;
534
535         (void)xseg_parse_spec(spec, &config);
536         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
537         if (xseg)
538                 return xseg;
539
540         (void)xseg_create(&config);
541         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
542 }
543
544 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
545                         long portno_end, uint32_t nr_threads, xport defer_portno)
546 {
547         int i;
548         struct peerd *peer;
549         struct xseg_port *port;
550
551 #ifdef ST_THREADS
552         st_init();
553 #endif
554         peer = malloc(sizeof(struct peerd));
555         if (!peer) {
556                 perror("malloc");
557                 return NULL;
558         }
559         peer->nr_ops = nr_ops;
560         peer->defer_portno = defer_portno;
561 #ifdef MT
562         peer->nr_threads = nr_threads;
563         peer->thread = calloc(nr_threads, sizeof(struct thread));
564         if (!peer->thread)
565                 goto malloc_fail;
566 #endif
567         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
568         if (!peer->peer_reqs){
569 malloc_fail:
570                 perror("malloc");
571                 return NULL;
572         }
573
574         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
575                 goto malloc_fail;
576 #ifdef MT
577         if (!xq_alloc_empty(&peer->threads, nr_threads))
578                 goto malloc_fail;
579 #endif
580         if (xseg_initialize()){
581                 printf("cannot initialize library\n");
582                 return NULL;
583         }
584         peer->xseg = join(spec);
585         if (!peer->xseg)
586                 return NULL;
587
588         peer->portno_start = (xport) portno_start;
589         peer->portno_end= (xport) portno_end;
590         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
591         if (!port){
592                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
593                 return NULL;
594         }
595
596         xport p;
597         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
598                 struct xseg_port *tmp;
599                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
600                 if (!tmp){
601                         printf("cannot bind to port %u\n", (unsigned int) p);
602                         return NULL;
603                 }
604         }
605
606         printf("Peer on ports  %u-%u\n", peer->portno_start,
607                         peer->portno_end);
608
609         for (i = 0; i < nr_ops; i++) {
610                 peer->peer_reqs[i].peer = peer;
611                 peer->peer_reqs[i].req = NULL;
612                 peer->peer_reqs[i].retval = 0;
613                 peer->peer_reqs[i].priv = NULL;
614                 peer->peer_reqs[i].portno = NoPort;
615 #ifdef ST_THREADS
616                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
617 #endif
618         }
619 #ifdef MT
620         peer->interactive_func = NULL;
621 #endif
622         return peer;
623 }
624
625 int pidfile_remove(char *path, int fd)
626 {
627         close(fd);
628         return (unlink(path));
629 }
630
631 int pidfile_write(int pid_fd)
632 {
633         char buf[16];
634         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
635         buf[15] = 0;
636
637         lseek(pid_fd, 0, SEEK_SET);
638         int ret = write(pid_fd, buf, strlen(buf));
639         return ret;
640 }
641
642 int pidfile_read(char *path, pid_t *pid)
643 {
644         char buf[16], *endptr;
645         *pid = 0;
646
647         int fd = open(path, O_RDONLY);
648         if (fd < 0)
649                 return -1;
650         int ret = read(fd, buf, 15);
651         buf[15]=0;
652         close(fd);
653         if (ret < 0)
654                 return -1;
655         else{
656                 *pid = strtol(buf, &endptr, 10);
657                 if (endptr != &buf[ret]){
658                         *pid = 0;
659                         return -1;
660                 }
661         }
662         return 0;
663 }
664
665 int pidfile_open(char *path, pid_t *old_pid)
666 {
667         //nfs version > 3
668         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
669         if (fd < 0){
670                 if (errno == EEXIST)
671                         pidfile_read(path, old_pid);
672         }
673         return fd;
674 }
675
676 void usage(char *argv0)
677 {
678         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
679         fprintf(stderr, "General peer options:\n"
680                 "  Option      | Default | \n"
681                 "  --------------------------------------------\n"
682                 "    -g        | None    | Segment spec to join\n"
683                 "    -sp       | NoPort  | Start portno to bind\n"
684                 "    -ep       | NoPort  | End portno to bind\n"
685                 "    -p        | NoPort  | Portno to bind\n"
686                 "    -n        | 16      | Number of ops\n"
687                 "    -v        | 0       | Verbosity level\n"
688                 "    -l        | None    | Logfile \n"
689                 "    -d        | No      | Daemonize \n"
690                 "    --pidfile | None    | Pidfile \n"
691 #ifdef MT
692                 "    -t        | No      | Number of threads \n"
693 #endif
694                 "\n"
695                );
696         custom_peer_usage();
697 }
698
699 int main(int argc, char *argv[])
700 {
701         struct peerd *peer = NULL;
702         //parse args
703         int r;
704         long portno_start = -1, portno_end = -1, portno = -1;
705
706         //set defaults here
707         int daemonize = 0, help = 0;
708         uint32_t nr_ops = 16;
709         uint32_t nr_threads = 1;
710         unsigned int debug_level = 0;
711         xport defer_portno = NoPort;
712         pid_t old_pid;
713         int pid_fd = -1;
714
715         char spec[MAX_SPEC_LEN + 1];
716         char logfile[MAX_LOGFILE_LEN + 1];
717         char pidfile[MAX_PIDFILE_LEN + 1];
718
719         logfile[0] = 0;
720         pidfile[0] = 0;
721         spec[0] = 0;
722
723         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
724         // -dp xseg_portno to defer blocking requests
725         // -l log file ?
726         //TODO print messages on arg parsing error
727         BEGIN_READ_ARGS(argc, argv);
728         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
729         READ_ARG_ULONG("-sp", portno_start);
730         READ_ARG_ULONG("-ep", portno_end);
731         READ_ARG_ULONG("-p", portno);
732         READ_ARG_ULONG("-n", nr_ops);
733         READ_ARG_ULONG("-v", debug_level);
734 #ifdef MT
735         READ_ARG_ULONG("-t", nr_threads);
736 #endif
737         READ_ARG_ULONG("-dp", defer_portno);
738         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
739         READ_ARG_BOOL("-d", daemonize);
740         READ_ARG_BOOL("-h", help);
741         READ_ARG_BOOL("--help", help);
742         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
743         END_READ_ARGS();
744
745         if (help){
746                 usage(argv[0]);
747                 return 0;
748         }
749
750         r = init_logctx(&lc, argv[0], debug_level, logfile,
751                         REDIRECT_STDOUT|REDIRECT_STDERR);
752         if (r < 0){
753                 XSEGLOG("Cannot initialize logging to logfile");
754                 return -1;
755         }
756         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
757
758         if (pidfile[0]){
759                 pid_fd = pidfile_open(pidfile, &old_pid);
760                 if (pid_fd < 0) {
761                         if (old_pid) {
762                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
763                         } else {
764                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
765                         }
766                         return -1;
767                 }
768         }
769
770         if (daemonize){
771                 if (daemon(0, 1) < 0){
772                         XSEGLOG2(&lc, E, "Cannot daemonize");
773                         r = -1;
774                         goto out;
775                 }
776         }
777
778         pidfile_write(pid_fd);
779
780         //TODO perform argument sanity checks
781         verbose = debug_level;
782         if (portno != -1) {
783                 portno_start = portno;
784                 portno_end = portno;
785         }
786         if (portno_start == -1 || portno_end == -1){
787                 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
788                 usage(argv[0]);
789                 r = -1;
790                 goto out;
791         }
792
793         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
794         if (!peer){
795                 r = -1;
796                 goto out;
797         }
798         setup_signals(peer);
799         r = custom_peer_init(peer, argc, argv);
800         if (r < 0)
801                 goto out;
802 #ifdef MT
803         //TODO err check
804         peerd_start_threads(peer);
805 #endif
806
807 #ifdef ST_THREADS
808         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
809         r = st_thread_join(st, NULL);
810 #else
811         if (peer->custom_peerd_loop)
812                 r = peer->custom_peerd_loop(peer);
813         else
814                 r = peerd_loop(peer);
815 #endif
816 out:
817         if (pid_fd > 0)
818                 pidfile_remove(pidfile, pid_fd);
819         return r;
820 }