Add support for custom peer loop
[archipelago] / xseg / peers / user / peer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/time.h>
42 #include <signal.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46
47 #ifdef MT
48 #include <pthread.h>
49 #endif
50
51 #include <xseg/xseg.h>
52 #include <peer.h>
53
54 #ifdef MT
55 #define PEER_TYPE "pthread"
56 #else
57 #define PEER_TYPE "posix"
58 #endif
59
60 //FIXME this should not be defined here probably
61 #define MAX_SPEC_LEN 128
62 #define MAX_PIDFILE_LEN 512
63
64 volatile unsigned int terminated = 0;
65 unsigned int verbose = 0;
66 struct log_ctx lc;
67 #ifdef ST_THREADS
68 uint32_t ta = 0;
69 #endif
70
71 #ifdef MT
72 struct peerd *global_peer;
73
74 struct thread {
75         struct peerd *peer;
76         pthread_t tid;
77         pthread_cond_t cond;
78         pthread_mutex_t lock;
79         void (*func)(void *arg);
80         void *arg;
81 };
82
83 inline static struct thread* alloc_thread(struct peerd *peer)
84 {
85         xqindex idx = xq_pop_head(&peer->threads, 1);
86         if (idx == Noneidx)
87                 return NULL;
88         return peer->thread + idx;
89 }
90
91 inline static void free_thread(struct peerd *peer, struct thread *t)
92 {
93         xqindex idx = t - peer->thread;
94         xq_append_head(&peer->threads, idx, 1);
95 }
96
97
98 inline static void __wake_up_thread(struct thread *t)
99 {
100         pthread_mutex_lock(&t->lock);
101         pthread_cond_signal(&t->cond);
102         pthread_mutex_unlock(&t->lock);
103 }
104
105 inline static void wake_up_thread(struct thread* t)
106 {
107         if (t){
108                 __wake_up_thread(t);
109         }
110 }
111
112 inline static int wake_up_next_thread(struct peerd *peer)
113 {
114         return (xseg_signal(peer->xseg, peer->portno_start));
115 }
116 #endif
117
118 /*
119  * extern is needed if this function is going to be called by another file
120  * such as bench-xseg.c
121  */
122 inline extern int isTerminate()
123 {
124 /* ta doesn't need to be taken into account, because the main loops
125  * doesn't check the terminated flag if ta is not 0.
126  */
127         /*
128 #ifdef ST_THREADS
129         return (!ta & terminated);
130 #else
131         return terminated;
132 #endif
133         */
134         return terminated;
135 }
136
137 void signal_handler(int signal)
138 {
139         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
140         terminated = 1;
141 #ifdef MT
142         wake_up_next_thread(global_peer);
143 #endif
144 }
145
146 void renew_logfile(int signal)
147 {
148         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
149         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
150 }
151
152 static int setup_signals(struct peerd *peer)
153 {
154         int r;
155         struct sigaction sa;
156 #ifdef MT
157         global_peer = peer;
158 #endif
159         sigemptyset(&sa.sa_mask);
160         sa.sa_flags = 0;
161         sa.sa_handler = signal_handler;
162         r = sigaction(SIGTERM, &sa, NULL);
163         if (r < 0)
164                 return r;
165         r = sigaction(SIGINT, &sa, NULL);
166         if (r < 0)
167                 return r;
168         r = sigaction(SIGQUIT, &sa, NULL);
169         if (r < 0)
170                 return r;
171
172         sa.sa_handler = renew_logfile;
173         r = sigaction(SIGUSR1, &sa, NULL);
174         if (r < 0)
175                 return r;
176
177         return r;
178 }
179
180 inline int canDefer(struct peerd *peer)
181 {
182         return !(peer->defer_portno == NoPort);
183 }
184
185 void print_req(struct xseg *xseg, struct xseg_request *req)
186 {
187         char target[64], data[64];
188         char *req_target, *req_data;
189         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
190         req_target = xseg_get_target(xseg, req);
191         req_data = xseg_get_data(xseg, req);
192
193         if (1) {
194                 strncpy(target, req_target, end);
195                 target[end] = 0;
196                 strncpy(data, req_data, 63);
197                 data[63] = 0;
198                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
199                                 "src: %u, transit: %u, dst: %u effective dst: %u\n"
200                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
201                                 (unsigned long)(req),
202                                 (unsigned int)req->op,
203                                 (unsigned long long)req->offset,
204                                 (unsigned long)req->size,
205                                 (unsigned long)req->serviced,
206                                 (unsigned int)req->state,
207                                 (unsigned int)req->src_portno,
208                                 (unsigned int)req->transit_portno,
209                                 (unsigned int)req->dst_portno,
210                                 (unsigned int)req->effective_dst_portno,
211                                 (unsigned int)req->targetlen, target,
212                                 (unsigned long long)req->datalen, data);
213         }
214 }
215 void log_pr(char *msg, struct peer_req *pr)
216 {
217         char target[64], data[64];
218         char *req_target, *req_data;
219         struct peerd *peer = pr->peer;
220         struct xseg *xseg = pr->peer->xseg;
221         req_target = xseg_get_target(xseg, pr->req);
222         req_data = xseg_get_data(xseg, pr->req);
223         /* null terminate name in case of req->target is less than 63 characters,
224          * and next character after name (aka first byte of next buffer) is not
225          * null
226          */
227         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
228         if (verbose) {
229                 strncpy(target, req_target, end);
230                 target[end] = 0;
231                 strncpy(data, req_data, 63);
232                 data[63] = 0;
233                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
234                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
235                                 msg,
236                                 (unsigned int)(pr - peer->peer_reqs),
237                                 (unsigned int)pr->req->op,
238                                 (unsigned long long)pr->req->offset,
239                                 (unsigned long)pr->req->size,
240                                 (unsigned long)pr->req->serviced,
241                                 (unsigned long)pr->retval,
242                                 (unsigned int)pr->req->state,
243                                 (unsigned int)pr->req->targetlen, target,
244                                 (unsigned long long)pr->req->datalen, data);
245         }
246 }
247
248 inline struct peer_req *alloc_peer_req(struct peerd *peer)
249 {
250         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
251         if (idx == Noneidx)
252                 return NULL;
253         return peer->peer_reqs + idx;
254 }
255
256 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
257 {
258         xqindex idx = pr - peer->peer_reqs;
259         pr->req = NULL;
260         xq_append_head(&peer->free_reqs, idx, 1);
261 }
262
263 struct timeval resp_start, resp_end, resp_accum = {0, 0};
264 uint64_t responds = 0;
265 void get_responds_stats(){
266                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
267                                 //(unsigned int)(t - peer->thread),
268                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
269 }
270
271 //FIXME error check
272 void fail(struct peerd *peer, struct peer_req *pr)
273 {
274         struct xseg_request *req = pr->req;
275         uint32_t p;
276         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
277         req->state |= XS_FAILED;
278         //xseg_set_req_data(peer->xseg, pr->req, NULL);
279         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
280         xseg_signal(peer->xseg, p);
281         free_peer_req(peer, pr);
282 #ifdef MT
283         wake_up_next_thread(peer);
284 #endif
285 }
286
287 //FIXME error check
288 void complete(struct peerd *peer, struct peer_req *pr)
289 {
290         struct xseg_request *req = pr->req;
291         uint32_t p;
292         req->state |= XS_SERVED;
293         //xseg_set_req_data(peer->xseg, pr->req, NULL);
294         //gettimeofday(&resp_start, NULL);
295         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
296         //gettimeofday(&resp_end, NULL);
297         //responds++;
298         //timersub(&resp_end, &resp_start, &resp_end);
299         //timeradd(&resp_end, &resp_accum, &resp_accum);
300         //printf("xseg_signal: %u\n", p);
301         xseg_signal(peer->xseg, p);
302         free_peer_req(peer, pr);
303 #ifdef MT
304         wake_up_next_thread(peer);
305 #endif
306 }
307
308 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
309                                 struct xseg_request *req)
310 {
311         struct xseg_request *xreq = pr->req;
312         //assert xreq == req;
313         XSEGLOG2(&lc, D, "Handle accepted");
314         xreq->serviced = 0;
315         //xreq->state = XS_ACCEPTED;
316         pr->retval = 0;
317         dispatch(peer, pr, req, dispatch_accept);
318 }
319
320 static void handle_received(struct peerd *peer, struct peer_req *pr,
321                                 struct xseg_request *req)
322 {
323         //struct xseg_request *req = pr->req;
324         //assert req->state != XS_ACCEPTED;
325         XSEGLOG2(&lc, D, "Handle received \n");
326         dispatch(peer, pr, req, dispatch_receive);
327
328 }
329 struct timeval sub_start, sub_end, sub_accum = {0, 0};
330 uint64_t submits = 0;
331 void get_submits_stats(){
332                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
333                                 //(unsigned int)(t - peer->thread),
334                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
335 }
336
337 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
338 {
339         uint32_t ret;
340         struct xseg_request *req = pr->req;
341         // assert req->portno == peer->portno ?
342         //TODO small function with error checking
343         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
344         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
345         if (ret < 0)
346                 return -1;
347         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
348         //gettimeofday(&sub_start, NULL);
349         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
350         //gettimeofday(&sub_end, NULL);
351         //submits++;
352         //timersub(&sub_end, &sub_start, &sub_end);
353         //timeradd(&sub_end, &sub_accum, &sub_accum);
354         if (ret == NoPort)
355                 return -1;
356         xseg_signal(peer->xseg, ret);
357         return 0;
358 }
359
360 int check_ports(struct peerd *peer)
361 {
362         struct xseg *xseg = peer->xseg;
363         xport portno_start = peer->portno_start;
364         xport portno_end = peer->portno_end;
365         struct xseg_request *accepted, *received;
366         struct peer_req *pr;
367         xport i;
368         int  r, c = 0;
369
370         for (i = portno_start; i <= portno_end; i++) {
371                 accepted = NULL;
372                 received = NULL;
373                 if (!isTerminate()) {
374                         pr = alloc_peer_req(peer);
375                         if (pr) {
376                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
377                                 if (accepted) {
378                                         pr->req = accepted;
379                                         pr->portno = i;
380                                         xseg_cancel_wait(xseg, i);
381                                         handle_accepted(peer, pr, accepted);
382                                         c = 1;
383                                 }
384                                 else {
385                                         free_peer_req(peer, pr);
386                                 }
387                         }
388                 }
389                 received = xseg_receive(xseg, i, X_NONBLOCK);
390                 if (received) {
391                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
392                         if (r < 0 || !pr){
393                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
394                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
395                                 if (p == NoPort){
396                                         XSEGLOG2(&lc, W, "Could not respond stale request");
397                                         xseg_put_request(xseg, received, portno_start);
398                                         continue;
399                                 } else {
400                                         xseg_signal(xseg, p);
401                                 }
402                         } else {
403                                 //maybe perform sanity check for pr
404                                 xseg_cancel_wait(xseg, i);
405                                 handle_received(peer, pr, received);
406                                 c = 1;
407                         }
408                 }
409         }
410
411         return c;
412 }
413
414 #ifdef MT
415 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
416 {
417         struct thread *t = alloc_thread(peer);
418         if (t) {
419                 t->func = func;
420                 t->arg = arg;
421                 wake_up_thread(t);
422                 return 0;
423         } else
424                 // we could hijack a thread
425                 return -1;
426 }
427
428 static void* thread_loop(void *arg)
429 {
430         struct thread *t = (struct thread *) arg;
431         struct peerd *peer = t->peer;
432         struct xseg *xseg = peer->xseg;
433         xport portno_start = peer->portno_start;
434         xport portno_end = peer->portno_end;
435         pid_t pid =syscall(SYS_gettid);
436         uint64_t loops;
437         uint64_t threshold=1000/(1 + portno_end - portno_start);
438
439         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
440
441         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
442         xseg_init_local_signal(xseg, peer->portno_start);
443         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
444                 XSEGLOG("Head of loop.\n");
445                 if (t->func) {
446                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
447                         xseg_cancel_wait(xseg, peer->portno_start);
448                         t->func(t->arg);
449                         t->func = NULL;
450                         t->arg = NULL;
451                         continue;
452                 }
453
454                 for(loops =  threshold; loops > 0; loops--) {
455                         if (loops == 1)
456                                 xseg_prepare_wait(xseg, peer->portno_start);
457                         if (check_ports(peer))
458                                 loops = threshold;
459                 }
460                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
461                 xseg_wait_signal(xseg, 10000000UL);
462                 xseg_cancel_wait(xseg, peer->portno_start);
463                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
464         }
465         wake_up_next_thread(peer);
466         custom_peer_finalize(peer);
467         return NULL;
468 }
469
470 int peerd_start_threads(struct peerd *peer)
471 {
472         int i;
473         uint32_t nr_threads = peer->nr_threads;
474         //TODO err check
475         for (i = 0; i < nr_threads; i++) {
476                 peer->thread[i].peer = peer;
477                 pthread_cond_init(&peer->thread[i].cond,NULL);
478                 pthread_mutex_init(&peer->thread[i].lock, NULL);
479                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
480                 peer->thread[i].func = NULL;
481                 peer->thread[i].arg = NULL;
482
483         }
484         return 0;
485 }
486 #endif
487
488
489 int defer_request(struct peerd *peer, struct peer_req *pr)
490 {
491         int r;
492         xport p;
493         if (!canDefer(peer)){
494                 XSEGLOG2(&lc, E, "Peer cannot defer requests");
495                 return -1;
496         }
497         p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
498                         X_ALLOC);
499         if (p == NoPort){
500                 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
501                 return -1;
502         }
503         r = xseg_signal(peer->xseg, p);
504         if (r < 0) {
505                 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
506         }
507         free_peer_req(peer, pr);
508         return 0;
509 }
510
511 static int peerd_loop(struct peerd *peer)
512 {
513 #ifdef MT
514         int i;
515         if (peer->interactive_func)
516                 peer->interactive_func();
517         for (i = 0; i < peer->nr_threads; i++) {
518                 pthread_join(peer->thread[i].tid, NULL);
519         }
520 #else
521         struct xseg *xseg = peer->xseg;
522         xport portno_start = peer->portno_start;
523         xport portno_end = peer->portno_end;
524         uint64_t threshold=1000/(1 + portno_end - portno_start);
525         pid_t pid =syscall(SYS_gettid);
526         uint64_t loops;
527
528         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
529         xseg_init_local_signal(xseg, peer->portno_start);
530         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
531                 for(loops= threshold; loops > 0; loops--) {
532                         if (loops == 1)
533                                 xseg_prepare_wait(xseg, peer->portno_start);
534                         if (check_ports(peer))
535                                 loops = threshold;
536                 }
537 #ifdef ST_THREADS
538                 if (ta){
539                         st_sleep(0);
540                 } else {
541 #endif
542                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
543                         xseg_wait_signal(xseg, 10000000UL);
544                         xseg_cancel_wait(xseg, peer->portno_start);
545                         XSEGLOG2(&lc, I, "Peer woke up\n");
546 #ifdef ST_THREADS
547                 }
548 #endif
549         }
550         custom_peer_finalize(peer);
551         xseg_quit_local_signal(xseg, peer->portno_start);
552 #endif
553         return 0;
554 }
555
556 static struct xseg *join(char *spec)
557 {
558         struct xseg_config config;
559         struct xseg *xseg;
560
561         (void)xseg_parse_spec(spec, &config);
562         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
563         if (xseg)
564                 return xseg;
565
566         (void)xseg_create(&config);
567         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
568 }
569
570 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
571                         long portno_end, uint32_t nr_threads, xport defer_portno)
572 {
573         int i;
574         struct peerd *peer;
575         struct xseg_port *port;
576
577 #ifdef ST_THREADS
578         st_init();
579 #endif
580         peer = malloc(sizeof(struct peerd));
581         if (!peer) {
582                 perror("malloc");
583                 return NULL;
584         }
585         peer->nr_ops = nr_ops;
586         peer->defer_portno = defer_portno;
587 #ifdef MT
588         peer->nr_threads = nr_threads;
589         peer->thread = calloc(nr_threads, sizeof(struct thread));
590         if (!peer->thread)
591                 goto malloc_fail;
592 #endif
593         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
594         if (!peer->peer_reqs){
595 malloc_fail:
596                 perror("malloc");
597                 return NULL;
598         }
599
600         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
601                 goto malloc_fail;
602 #ifdef MT
603         if (!xq_alloc_empty(&peer->threads, nr_threads))
604                 goto malloc_fail;
605 #endif
606         if (xseg_initialize()){
607                 printf("cannot initialize library\n");
608                 return NULL;
609         }
610         peer->xseg = join(spec);
611         if (!peer->xseg)
612                 return NULL;
613
614         peer->portno_start = (xport) portno_start;
615         peer->portno_end= (xport) portno_end;
616         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
617         if (!port){
618                 printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
619                 return NULL;
620         }
621
622         xport p;
623         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
624                 struct xseg_port *tmp;
625                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
626                 if (!tmp){
627                         printf("cannot bind to port %u\n", (unsigned int) p);
628                         return NULL;
629                 }
630         }
631
632         printf("Peer on ports  %u-%u\n", peer->portno_start,
633                         peer->portno_end);
634
635         for (i = 0; i < nr_ops; i++) {
636                 peer->peer_reqs[i].peer = peer;
637                 peer->peer_reqs[i].req = NULL;
638                 peer->peer_reqs[i].retval = 0;
639                 peer->peer_reqs[i].priv = NULL;
640                 peer->peer_reqs[i].portno = NoPort;
641 #ifdef ST_THREADS
642                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
643 #endif
644         }
645 #ifdef MT
646         peer->interactive_func = NULL;
647 #endif
648         return peer;
649 }
650
651 int pidfile_remove(char *path, int fd)
652 {
653         close(fd);
654         return (unlink(path));
655 }
656
657 int pidfile_write(int pid_fd)
658 {
659         char buf[16];
660         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
661         buf[15] = 0;
662
663         lseek(pid_fd, 0, SEEK_SET);
664         int ret = write(pid_fd, buf, strlen(buf));
665         return ret;
666 }
667
668 int pidfile_read(char *path, pid_t *pid)
669 {
670         char buf[16], *endptr;
671         *pid = 0;
672
673         int fd = open(path, O_RDONLY);
674         if (fd < 0)
675                 return -1;
676         int ret = read(fd, buf, 15);
677         buf[15]=0;
678         close(fd);
679         if (ret < 0)
680                 return -1;
681         else{
682                 *pid = strtol(buf, &endptr, 10);
683                 if (endptr != &buf[ret]){
684                         *pid = 0;
685                         return -1;
686                 }
687         }
688         return 0;
689 }
690
691 int pidfile_open(char *path, pid_t *old_pid)
692 {
693         //nfs version > 3
694         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
695         if (fd < 0){
696                 if (errno == EEXIST)
697                         pidfile_read(path, old_pid);
698         }
699         return fd;
700 }
701
702 void usage(char *argv0)
703 {
704         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
705         fprintf(stderr, "General peer options:\n"
706                 "  Option      | Default | \n"
707                 "  --------------------------------------------\n"
708                 "    -g        | None    | Segment spec to join\n"
709                 "    -sp       | NoPort  | Start portno to bind\n"
710                 "    -ep       | NoPort  | End portno to bind\n"
711                 "    -p        | NoPort  | Portno to bind\n"
712                 "    -n        | 16      | Number of ops\n"
713                 "    -v        | 0       | Verbosity level\n"
714                 "    -l        | None    | Logfile \n"
715                 "    -d        | No      | Daemonize \n"
716                 "    --pidfile | None    | Pidfile \n"
717 #ifdef MT
718                 "    -t        | No      | Number of threads \n"
719 #endif
720                 "\n"
721                );
722         custom_peer_usage();
723 }
724
725 int main(int argc, char *argv[])
726 {
727         struct peerd *peer = NULL;
728         //parse args
729         int r;
730         long portno_start = -1, portno_end = -1, portno = -1;
731
732         //set defaults here
733         int daemonize = 0, help = 0;
734         uint32_t nr_ops = 16;
735         uint32_t nr_threads = 1;
736         unsigned int debug_level = 0;
737         xport defer_portno = NoPort;
738         pid_t old_pid;
739         int pid_fd = -1;
740
741         char spec[MAX_SPEC_LEN + 1];
742         char logfile[MAX_LOGFILE_LEN + 1];
743         char pidfile[MAX_PIDFILE_LEN + 1];
744
745         logfile[0] = 0;
746         pidfile[0] = 0;
747         spec[0] = 0;
748
749         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
750         // -dp xseg_portno to defer blocking requests
751         // -l log file ?
752         //TODO print messages on arg parsing error
753         BEGIN_READ_ARGS(argc, argv);
754         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
755         READ_ARG_ULONG("-sp", portno_start);
756         READ_ARG_ULONG("-ep", portno_end);
757         READ_ARG_ULONG("-p", portno);
758         READ_ARG_ULONG("-n", nr_ops);
759         READ_ARG_ULONG("-v", debug_level);
760 #ifdef MT
761         READ_ARG_ULONG("-t", nr_threads);
762 #endif
763         READ_ARG_ULONG("-dp", defer_portno);
764         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
765         READ_ARG_BOOL("-d", daemonize);
766         READ_ARG_BOOL("-h", help);
767         READ_ARG_BOOL("--help", help);
768         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
769         END_READ_ARGS();
770
771         if (help){
772                 usage(argv[0]);
773                 return 0;
774         }
775
776         r = init_logctx(&lc, argv[0], debug_level, logfile,
777                         REDIRECT_STDOUT|REDIRECT_STDERR);
778         if (r < 0){
779                 XSEGLOG("Cannot initialize logging to logfile");
780                 return -1;
781         }
782         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
783
784         if (pidfile[0]){
785                 pid_fd = pidfile_open(pidfile, &old_pid);
786                 if (pid_fd < 0) {
787                         if (old_pid) {
788                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
789                         } else {
790                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
791                         }
792                         return -1;
793                 }
794         }
795
796         if (daemonize){
797                 if (daemon(0, 1) < 0){
798                         XSEGLOG2(&lc, E, "Cannot daemonize");
799                         r = -1;
800                         goto out;
801                 }
802         }
803
804         pidfile_write(pid_fd);
805
806         //TODO perform argument sanity checks
807         verbose = debug_level;
808         if (portno != -1) {
809                 portno_start = portno;
810                 portno_end = portno;
811         }
812         if (portno_start == -1 || portno_end == -1){
813                 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
814                 usage(argv[0]);
815                 r = -1;
816                 goto out;
817         }
818
819         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
820         if (!peer){
821                 r = -1;
822                 goto out;
823         }
824         setup_signals(peer);
825         r = custom_peer_init(peer, argc, argv);
826         if (r < 0)
827                 goto out;
828 #ifdef MT
829         //TODO err check
830         peerd_start_threads(peer);
831 #endif
832
833 #ifdef ST_THREADS
834         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
835         r = st_thread_join(st, NULL);
836 #else
837         if (peer->custom_peerd_loop)
838                 r = peer->custom_peerd_loop(peer);
839         else
840                 r = peerd_loop(peer);
841 #endif
842 out:
843         if (pid_fd > 0)
844                 pidfile_remove(pidfile, pid_fd);
845         return r;
846 }