c0a0dd570713d9c386261c4ccefc7a0a92a8c51f
[archipelago] / xseg / peers / user / peer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/time.h>
42 #include <signal.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46 #ifdef MT
47 #include <pthread.h>
48 #endif
49
50 #include <xseg/xseg.h>
51 #include <peer.h>
52
53 #ifdef MT
54 #ifdef ST_THREADS
55 #error "MT and ST_THREADS defines are mutually exclusive"
56 #endif
57 #endif
58
59 #ifdef MT
60 #define PEER_TYPE "pthread"
61 #else
62 #define PEER_TYPE "posix"
63 #endif
64
65 //FIXME this should not be defined here probably
66 #define MAX_SPEC_LEN 128
67 #define MAX_PIDFILE_LEN 512
68
69 volatile unsigned int terminated = 0;
70 unsigned int verbose = 0;
71 struct log_ctx lc;
72 #ifdef ST_THREADS
73 uint32_t ta = 0;
74 #endif
75
76 #ifdef MT
77 struct peerd *global_peer;
78 static pthread_key_t threadkey;
79
80 /*
81 struct thread {
82         struct peerd *peer;
83         pthread_t tid;
84         pthread_cond_t cond;
85         pthread_mutex_t lock;
86         int thread_no;
87         void (*func)(void *arg);
88         void *arg;
89         void *priv;
90         struct xq free_thread_reqs;
91 };
92 */
93 inline static struct thread* alloc_thread(struct peerd *peer)
94 {
95         xqindex idx = xq_pop_head(&peer->threads, 1);
96         if (idx == Noneidx)
97                 return NULL;
98         return peer->thread + idx;
99 }
100
101 inline static void free_thread(struct peerd *peer, struct thread *t)
102 {
103         xqindex idx = t - peer->thread;
104         xq_append_head(&peer->threads, idx, 1);
105 }
106
107
108 inline static void __wake_up_thread(struct thread *t)
109 {
110         pthread_mutex_lock(&t->lock);
111         pthread_cond_signal(&t->cond);
112         pthread_mutex_unlock(&t->lock);
113 }
114
115 inline static void wake_up_thread(struct thread* t)
116 {
117         if (t){
118                 __wake_up_thread(t);
119         }
120 }
121
122 inline static int wake_up_next_thread(struct peerd *peer)
123 {
124         return (xseg_signal(peer->xseg, peer->portno_start));
125 }
126 #endif
127
128 /*
129  * extern is needed if this function is going to be called by another file
130  * such as bench-xseg.c
131  */
132
133 void signal_handler(int signal)
134 {
135         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
136         terminated = 1;
137 #ifdef MT
138         wake_up_next_thread(global_peer);
139 #endif
140 }
141
142 void renew_logfile(int signal)
143 {
144         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
145         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
146 }
147
148 static int setup_signals(struct peerd *peer)
149 {
150         int r;
151         struct sigaction sa;
152 #ifdef MT
153         global_peer = peer;
154 #endif
155         sigemptyset(&sa.sa_mask);
156         sa.sa_flags = 0;
157         sa.sa_handler = signal_handler;
158         r = sigaction(SIGTERM, &sa, NULL);
159         if (r < 0)
160                 return r;
161         r = sigaction(SIGINT, &sa, NULL);
162         if (r < 0)
163                 return r;
164         r = sigaction(SIGQUIT, &sa, NULL);
165         if (r < 0)
166                 return r;
167
168         sa.sa_handler = renew_logfile;
169         r = sigaction(SIGUSR1, &sa, NULL);
170         if (r < 0)
171                 return r;
172
173         return r;
174 }
175
176 inline int canDefer(struct peerd *peer)
177 {
178         return !(peer->defer_portno == NoPort);
179 }
180
181 void print_req(struct xseg *xseg, struct xseg_request *req)
182 {
183         char target[64], data[64];
184         char *req_target, *req_data;
185         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
186         req_target = xseg_get_target(xseg, req);
187         req_data = xseg_get_data(xseg, req);
188
189         if (1) {
190                 strncpy(target, req_target, end);
191                 target[end] = 0;
192                 strncpy(data, req_data, 63);
193                 data[63] = 0;
194                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
195                                 "src: %u, transit: %u, dst: %u effective dst: %u\n"
196                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
197                                 (unsigned long)(req),
198                                 (unsigned int)req->op,
199                                 (unsigned long long)req->offset,
200                                 (unsigned long)req->size,
201                                 (unsigned long)req->serviced,
202                                 (unsigned int)req->state,
203                                 (unsigned int)req->src_portno,
204                                 (unsigned int)req->transit_portno,
205                                 (unsigned int)req->dst_portno,
206                                 (unsigned int)req->effective_dst_portno,
207                                 (unsigned int)req->targetlen, target,
208                                 (unsigned long long)req->datalen, data);
209         }
210 }
211 void log_pr(char *msg, struct peer_req *pr)
212 {
213         char target[64], data[64];
214         char *req_target, *req_data;
215         struct peerd *peer = pr->peer;
216         struct xseg *xseg = pr->peer->xseg;
217         req_target = xseg_get_target(xseg, pr->req);
218         req_data = xseg_get_data(xseg, pr->req);
219         /* null terminate name in case of req->target is less than 63 characters,
220          * and next character after name (aka first byte of next buffer) is not
221          * null
222          */
223         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
224         if (verbose) {
225                 strncpy(target, req_target, end);
226                 target[end] = 0;
227                 strncpy(data, req_data, 63);
228                 data[63] = 0;
229                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
230                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
231                                 msg,
232                                 (unsigned int)(pr - peer->peer_reqs),
233                                 (unsigned int)pr->req->op,
234                                 (unsigned long long)pr->req->offset,
235                                 (unsigned long)pr->req->size,
236                                 (unsigned long)pr->req->serviced,
237                                 (unsigned long)pr->retval,
238                                 (unsigned int)pr->req->state,
239                                 (unsigned int)pr->req->targetlen, target,
240                                 (unsigned long long)pr->req->datalen, data);
241         }
242 }
243
244 #ifdef MT
245 inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
246 {
247         struct peer_req *pr;
248         struct thread *nt;
249         int i;
250         xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
251         if (idx != Noneidx)
252                 goto out;
253
254         /* try to steal from another thread */
255         /*
256         for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
257                 nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
258                 if (!xq_count(&nt->free_thread_reqs))
259                                 continue;
260                 idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
261                 if (idx != Noneidx)
262                         goto out;
263         }
264         */
265         return NULL;
266 out:
267         pr = peer->peer_reqs + idx;
268         pr->thread_no = t - peer->thread;
269         return pr;
270 }
271 #else
272 /*
273  * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
274  * queue. If a pointer from peer_reqs is popped, we are certain that the
275  * associated memory in peer_reqs is free to use
276  */
277 inline struct peer_req *alloc_peer_req(struct peerd *peer)
278 {
279         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
280         if (idx == Noneidx)
281                 return NULL;
282         return peer->peer_reqs + idx;
283 }
284 #endif
285
286 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
287 {
288         xqindex idx = pr - peer->peer_reqs;
289         pr->req = NULL;
290 #ifdef MT
291         struct thread *t = &peer->thread[pr->thread_no];
292         xq_append_head(&t->free_thread_reqs, idx, 1);
293 #else
294         xq_append_head(&peer->free_reqs, idx, 1);
295 #endif
296 }
297
298 /*
299  * Count all free reqs in peer.
300  * Racy, if multithreaded, but the sum should monotonicly increase when checked
301  * after a termination signal is catched.
302  */
303 int all_peer_reqs_free(struct peerd *peer)
304 {
305         uint32_t free_reqs = 0;
306         int i;
307 #ifdef MT
308         for (i = 0; i < peer->nr_threads; i++) {
309                 free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
310         }
311 #else
312         free_reqs = xq_count(&peer->free_reqs);
313 #endif
314         if (free_reqs == peer->nr_ops)
315                 return 1;
316         return 0;
317 }
318
319 struct timeval resp_start, resp_end, resp_accum = {0, 0};
320 uint64_t responds = 0;
321 void get_responds_stats(){
322                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
323                                 //(unsigned int)(t - peer->thread),
324                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
325 }
326
327 //FIXME error check
328 void fail(struct peerd *peer, struct peer_req *pr)
329 {
330         struct xseg_request *req = pr->req;
331         uint32_t p;
332         if (req){
333                 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
334                 req->state |= XS_FAILED;
335                 //xseg_set_req_data(peer->xseg, pr->req, NULL);
336                 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
337                 xseg_signal(peer->xseg, p);
338         }
339         free_peer_req(peer, pr);
340 #ifdef MT
341         wake_up_next_thread(peer);
342 #endif
343 }
344
345 //FIXME error check
346 void complete(struct peerd *peer, struct peer_req *pr)
347 {
348         struct xseg_request *req = pr->req;
349         uint32_t p;
350         if (req){
351                 req->state |= XS_SERVED;
352                 //xseg_set_req_data(peer->xseg, pr->req, NULL);
353                 //gettimeofday(&resp_start, NULL);
354                 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
355                 //gettimeofday(&resp_end, NULL);
356                 //responds++;
357                 //timersub(&resp_end, &resp_start, &resp_end);
358                 //timeradd(&resp_end, &resp_accum, &resp_accum);
359                 //printf("xseg_signal: %u\n", p);
360                 xseg_signal(peer->xseg, p);
361         }
362         free_peer_req(peer, pr);
363 #ifdef MT
364         wake_up_next_thread(peer);
365 #endif
366 }
367
368 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
369                                 struct xseg_request *req)
370 {
371         struct xseg_request *xreq = pr->req;
372         //assert xreq == req;
373         XSEGLOG2(&lc, D, "Handle accepted");
374         xreq->serviced = 0;
375         //xreq->state = XS_ACCEPTED;
376         pr->retval = 0;
377         dispatch(peer, pr, req, dispatch_accept);
378 }
379
380 static void handle_received(struct peerd *peer, struct peer_req *pr,
381                                 struct xseg_request *req)
382 {
383         //struct xseg_request *req = pr->req;
384         //assert req->state != XS_ACCEPTED;
385         XSEGLOG2(&lc, D, "Handle received \n");
386         dispatch(peer, pr, req, dispatch_receive);
387
388 }
389 struct timeval sub_start, sub_end, sub_accum = {0, 0};
390 uint64_t submits = 0;
391 void get_submits_stats(){
392                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
393                                 //(unsigned int)(t - peer->thread),
394                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
395 }
396
397 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
398 {
399         uint32_t ret;
400         struct xseg_request *req = pr->req;
401         // assert req->portno == peer->portno ?
402         //TODO small function with error checking
403         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
404         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
405         if (ret < 0)
406                 return -1;
407         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
408         //gettimeofday(&sub_start, NULL);
409         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
410         //gettimeofday(&sub_end, NULL);
411         //submits++;
412         //timersub(&sub_end, &sub_start, &sub_end);
413         //timeradd(&sub_end, &sub_accum, &sub_accum);
414         if (ret == NoPort)
415                 return -1;
416         xseg_signal(peer->xseg, ret);
417         return 0;
418 }
419
420 #ifdef MT
421 int check_ports(struct peerd *peer, struct thread *t)
422 #else
423 int check_ports(struct peerd *peer)
424 #endif
425 {
426         struct xseg *xseg = peer->xseg;
427         xport portno_start = peer->portno_start;
428         xport portno_end = peer->portno_end;
429         struct xseg_request *accepted, *received;
430         struct peer_req *pr;
431         xport i;
432         int  r, c = 0;
433
434         for (i = portno_start; i <= portno_end; i++) {
435                 accepted = NULL;
436                 received = NULL;
437                 if (!isTerminate()) {
438 #ifdef MT
439                         pr = alloc_peer_req(peer, t); 
440 #else
441                         pr = alloc_peer_req(peer);
442 #endif
443                         if (pr) {
444                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
445                                 if (accepted) {
446                                         pr->req = accepted;
447                                         pr->portno = i;
448                                         xseg_cancel_wait(xseg, i);
449                                         handle_accepted(peer, pr, accepted);
450                                         c = 1;
451                                 }
452                                 else {
453                                         free_peer_req(peer, pr);
454                                 }
455                         }
456                 }
457                 received = xseg_receive(xseg, i, X_NONBLOCK);
458                 if (received) {
459                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
460                         if (r < 0 || !pr){
461                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
462                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
463                                 if (p == NoPort){
464                                         XSEGLOG2(&lc, W, "Could not respond stale request");
465                                         xseg_put_request(xseg, received, portno_start);
466                                         continue;
467                                 } else {
468                                         xseg_signal(xseg, p);
469                                 }
470                         } else {
471                                 //maybe perform sanity check for pr
472                                 xseg_cancel_wait(xseg, i);
473                                 handle_received(peer, pr, received);
474                                 c = 1;
475                         }
476                 }
477         }
478
479         return c;
480 }
481
482 #ifdef MT
483 static void* thread_loop(void *arg)
484 {
485         struct thread *t = (struct thread *) arg;
486         struct peerd *peer = t->peer;
487         struct xseg *xseg = peer->xseg;
488         xport portno_start = peer->portno_start;
489         xport portno_end = peer->portno_end;
490         pid_t pid =syscall(SYS_gettid);
491         uint64_t loops;
492         uint64_t threshold=1000/(1 + portno_end - portno_start);
493
494         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
495         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
496         xseg_init_local_signal(xseg, peer->portno_start);
497         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
498                 for(loops =  threshold; loops > 0; loops--) {
499                         if (loops == 1)
500                                 xseg_prepare_wait(xseg, peer->portno_start);
501                         if (check_ports(peer, t))
502                                 loops = threshold;
503                 }
504                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
505                 xseg_wait_signal(xseg, 10000000UL);
506                 xseg_cancel_wait(xseg, peer->portno_start);
507                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
508         }
509         wake_up_next_thread(peer);
510         custom_peer_finalize(peer);
511         return NULL;
512 }
513
514 void *init_thread_loop(void *arg)
515 {
516         struct thread *t = (struct thread *) arg;
517         struct peerd *peer = t->peer;
518         char *thread_id;
519         int i;
520
521         /*
522          * We need an identifier for every thread that will spin in peerd_loop.
523          * The following code is a way to create a string of this format:
524          *              "Thread <num>"
525          * minus the null terminator. What we do is we create this string with
526          * snprintf and then resize it to exclude the null terminator with
527          * realloc. Finally, the result string is passed to the (void *arg) field
528          * of struct thread.
529          *
530          * Since the highest thread number can't be more than 5 digits, using 13
531          * chars should be more than enough.
532          */
533         thread_id = malloc(13 * sizeof(char));
534         snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
535         for (i = 0; thread_id[i]; i++) {}
536         t->arg = (void *)realloc(thread_id, i-1);
537         pthread_setspecific(threadkey, t);
538
539         //Start thread loop
540         (void)peer->peerd_loop(t);
541
542         wake_up_next_thread(peer);
543         custom_peer_finalize(peer);
544
545         return NULL;
546 }
547
548 int peerd_start_threads(struct peerd *peer)
549 {
550         int i;
551         uint32_t nr_threads = peer->nr_threads;
552         //TODO err check
553         for (i = 0; i < nr_threads; i++) {
554                 peer->thread[i].func = NULL;
555                 peer->thread[i].arg = NULL;
556                 peer->thread[i].peer = peer;
557                 pthread_cond_init(&peer->thread[i].cond,NULL);
558                 pthread_mutex_init(&peer->thread[i].lock, NULL);
559                 pthread_create(&peer->thread[i].tid, NULL,
560                                         init_thread_loop, (void *)(peer->thread + i));
561         }
562
563         if (peer->interactive_func)
564                 peer->interactive_func();
565         for (i = 0; i < nr_threads; i++) {
566                 pthread_join(peer->thread[i].tid, NULL);
567         }
568
569         return 0;
570 }
571 #endif
572
573 int defer_request(struct peerd *peer, struct peer_req *pr)
574 {
575         int r;
576         xport p;
577         if (!canDefer(peer)){
578                 XSEGLOG2(&lc, E, "Peer cannot defer requests");
579                 return -1;
580         }
581         p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
582                         X_ALLOC);
583         if (p == NoPort){
584                 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
585                 return -1;
586         }
587         r = xseg_signal(peer->xseg, p);
588         if (r < 0) {
589                 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
590         }
591         free_peer_req(peer, pr);
592         return 0;
593 }
594
595 /*
596  * generic_peerd_loop is a general-purpose port-checker loop that is
597  * suitable both for multi-threaded and single-threaded peers.
598  */
599 static int generic_peerd_loop(void *arg)
600 {
601 #ifdef MT
602         struct thread *t = (struct thread *) arg;
603         struct peerd *peer = t->peer;
604         char *id = t->arg;
605 #else
606         struct peerd *peer = (struct peerd *) arg;
607         char id[4] = {'P','e','e','r'};
608 #endif
609         struct xseg *xseg = peer->xseg;
610         xport portno_start = peer->portno_start;
611         xport portno_end = peer->portno_end;
612         pid_t pid = syscall(SYS_gettid);
613         uint64_t threshold=1000/(1 + portno_end - portno_start);
614         uint64_t loops;
615
616         XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
617         xseg_init_local_signal(xseg, peer->portno_start);
618         //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
619         for (;!(isTerminate() && all_peer_reqs_free(peer));) {
620 #ifdef MT
621                 if (t->func) {
622                         XSEGLOG2(&lc, D, "%s executes function\n", id);
623                         xseg_cancel_wait(xseg, peer->portno_start);
624                         t->func(t->arg);
625                         t->func = NULL;
626                         t->arg = NULL;
627                         continue;
628                 }
629 #endif
630                 //Heart of peerd_loop. This loop is common for everyone.
631                 for(loops = threshold; loops > 0; loops--) {
632                         if (loops == 1)
633                                 xseg_prepare_wait(xseg, peer->portno_start);
634 #ifdef MT
635                         if (check_ports(peer, t))
636 #else
637                         if (check_ports(peer))
638 #endif
639                                 loops = threshold;
640                 }
641 #ifdef ST_THREADS
642                 if (ta){
643                         st_sleep(0);
644                         continue;
645                 }
646 #endif
647                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
648                 xseg_wait_signal(xseg, 10000000UL);
649                 xseg_cancel_wait(xseg, peer->portno_start);
650                 XSEGLOG2(&lc, I, "%s woke up\n", id);
651         }
652         return 0;
653 }
654
655 static int init_peerd_loop(struct peerd *peer)
656 {
657         struct xseg *xseg = peer->xseg;
658
659         peer->peerd_loop(peer);
660         custom_peer_finalize(peer);
661         xseg_quit_local_signal(xseg, peer->portno_start);
662
663         return 0;
664 }
665
666 static struct xseg *join(char *spec)
667 {
668         struct xseg_config config;
669         struct xseg *xseg;
670
671         (void)xseg_parse_spec(spec, &config);
672         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
673         if (xseg)
674                 return xseg;
675
676         (void)xseg_create(&config);
677         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
678 }
679
680 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
681                         long portno_end, uint32_t nr_threads, xport defer_portno)
682 {
683         int i;
684         struct peerd *peer;
685         struct xseg_port *port;
686         void *sd = NULL;
687         xport p;
688
689 #ifdef ST_THREADS
690         st_init();
691 #endif
692         peer = malloc(sizeof(struct peerd));
693         if (!peer) {
694                 perror("malloc");
695                 return NULL;
696         }
697         peer->nr_ops = nr_ops;
698         peer->defer_portno = defer_portno;
699 #ifdef MT
700         peer->nr_threads = nr_threads;
701         peer->thread = calloc(nr_threads, sizeof(struct thread));
702         if (!peer->thread)
703                 goto malloc_fail;
704         if (!xq_alloc_empty(&peer->threads, nr_threads))
705                 goto malloc_fail;
706         for (i = 0; i < nr_threads; i++) {
707                 peer->thread[i].thread_no = i;
708                 peer->thread[i].peer = peer;
709                 if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
710                         goto malloc_fail;
711         }
712         for (i = 0; i < nr_ops; i++) {
713                 __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
714         }
715
716         pthread_key_create(&threadkey, NULL);
717 #else
718         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
719                 goto malloc_fail;
720 #endif
721         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
722         if (!peer->peer_reqs){
723 malloc_fail:
724                 perror("malloc");
725                 return NULL;
726         }
727         if (xseg_initialize()){
728                 printf("cannot initialize library\n");
729                 return NULL;
730         }
731         peer->xseg = join(spec);
732         if (!peer->xseg)
733                 return NULL;
734
735         peer->portno_start = (xport) portno_start;
736         peer->portno_end= (xport) portno_end;
737
738         /*
739          * Start binding ports from portno_start to portno_end.
740          * The first port we bind will have its signal_desc initialized by xseg
741          * and the same signal_desc will be used for all the other ports.
742          */
743         for (p = peer->portno_start; p <= peer->portno_end; p++) {
744                 port = xseg_bind_port(peer->xseg, p, sd);
745                 if (!port){
746                         printf("cannot bind to port %u\n", (unsigned int) p);
747                         return NULL;
748                 }
749                 if (p == peer->portno_start)
750                         sd = xseg_get_signal_desc(peer->xseg, port);
751         }
752
753         printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
754
755         for (i = 0; i < nr_ops; i++) {
756                 peer->peer_reqs[i].peer = peer;
757                 peer->peer_reqs[i].req = NULL;
758                 peer->peer_reqs[i].retval = 0;
759                 peer->peer_reqs[i].priv = NULL;
760                 peer->peer_reqs[i].portno = NoPort;
761
762         //Plug default peerd_loop. This can change later on by custom_peer_init.
763         peer->peerd_loop = generic_peerd_loop;
764
765 #ifdef ST_THREADS
766                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
767 #endif
768         }
769 #ifdef MT
770         peer->interactive_func = NULL;
771 #endif
772         return peer;
773
774 }
775
776 int pidfile_remove(char *path, int fd)
777 {
778         close(fd);
779         return (unlink(path));
780 }
781
782 int pidfile_write(int pid_fd)
783 {
784         char buf[16];
785         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
786         buf[15] = 0;
787
788         lseek(pid_fd, 0, SEEK_SET);
789         int ret = write(pid_fd, buf, strlen(buf));
790         return ret;
791 }
792
793 int pidfile_read(char *path, pid_t *pid)
794 {
795         char buf[16], *endptr;
796         *pid = 0;
797
798         int fd = open(path, O_RDONLY);
799         if (fd < 0)
800                 return -1;
801         int ret = read(fd, buf, 15);
802         buf[15]=0;
803         close(fd);
804         if (ret < 0)
805                 return -1;
806         else{
807                 *pid = strtol(buf, &endptr, 10);
808                 if (endptr != &buf[ret]){
809                         *pid = 0;
810                         return -1;
811                 }
812         }
813         return 0;
814 }
815
816 int pidfile_open(char *path, pid_t *old_pid)
817 {
818         //nfs version > 3
819         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
820         if (fd < 0){
821                 if (errno == EEXIST)
822                         pidfile_read(path, old_pid);
823         }
824         return fd;
825 }
826
827 void usage(char *argv0)
828 {
829         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
830         fprintf(stderr, "General peer options:\n"
831                 "  Option      | Default | \n"
832                 "  --------------------------------------------\n"
833                 "    -g        | None    | Segment spec to join\n"
834                 "    -sp       | NoPort  | Start portno to bind\n"
835                 "    -ep       | NoPort  | End portno to bind\n"
836                 "    -p        | NoPort  | Portno to bind\n"
837                 "    -n        | 16      | Number of ops\n"
838                 "    -v        | 0       | Verbosity level\n"
839                 "    -l        | None    | Logfile \n"
840                 "    -d        | No      | Daemonize \n"
841                 "    --pidfile | None    | Pidfile \n"
842 #ifdef MT
843                 "    -t        | No      | Number of threads \n"
844 #endif
845                 "\n"
846                );
847         custom_peer_usage();
848 }
849
850 int main(int argc, char *argv[])
851 {
852         struct peerd *peer = NULL;
853         //parse args
854         int r;
855         long portno_start = -1, portno_end = -1, portno = -1;
856
857         //set defaults here
858         int daemonize = 0, help = 0;
859         uint32_t nr_ops = 16;
860         uint32_t nr_threads = 1;
861         unsigned int debug_level = 0;
862         xport defer_portno = NoPort;
863         pid_t old_pid;
864         int pid_fd = -1;
865
866         char spec[MAX_SPEC_LEN + 1];
867         char logfile[MAX_LOGFILE_LEN + 1];
868         char pidfile[MAX_PIDFILE_LEN + 1];
869
870         logfile[0] = 0;
871         pidfile[0] = 0;
872         spec[0] = 0;
873
874         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
875         // -dp xseg_portno to defer blocking requests
876         // -l log file ?
877         //TODO print messages on arg parsing error
878         BEGIN_READ_ARGS(argc, argv);
879         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
880         READ_ARG_ULONG("-sp", portno_start);
881         READ_ARG_ULONG("-ep", portno_end);
882         READ_ARG_ULONG("-p", portno);
883         READ_ARG_ULONG("-n", nr_ops);
884         READ_ARG_ULONG("-v", debug_level);
885 #ifdef MT
886         READ_ARG_ULONG("-t", nr_threads);
887 #endif
888         READ_ARG_ULONG("-dp", defer_portno);
889         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
890         READ_ARG_BOOL("-d", daemonize);
891         READ_ARG_BOOL("-h", help);
892         READ_ARG_BOOL("--help", help);
893         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
894         END_READ_ARGS();
895
896         if (help){
897                 usage(argv[0]);
898                 return 0;
899         }
900
901         r = init_logctx(&lc, argv[0], debug_level, logfile,
902                         REDIRECT_STDOUT|REDIRECT_STDERR);
903         if (r < 0){
904                 XSEGLOG("Cannot initialize logging to logfile");
905                 return -1;
906         }
907         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
908
909         if (pidfile[0]){
910                 pid_fd = pidfile_open(pidfile, &old_pid);
911                 if (pid_fd < 0) {
912                         if (old_pid) {
913                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
914                         } else {
915                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
916                         }
917                         return -1;
918                 }
919         }
920
921         if (daemonize){
922                 if (daemon(0, 1) < 0){
923                         XSEGLOG2(&lc, E, "Cannot daemonize");
924                         r = -1;
925                         goto out;
926                 }
927         }
928
929         pidfile_write(pid_fd);
930
931         //TODO perform argument sanity checks
932         verbose = debug_level;
933         if (portno != -1) {
934                 portno_start = portno;
935                 portno_end = portno;
936         }
937         if (portno_start == -1 || portno_end == -1){
938                 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
939                 usage(argv[0]);
940                 r = -1;
941                 goto out;
942         }
943
944         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
945         if (!peer){
946                 r = -1;
947                 goto out;
948         }
949         setup_signals(peer);
950         r = custom_peer_init(peer, argc, argv);
951         if (r < 0)
952                 goto out;
953 #if defined(MT)
954         //TODO err check
955         peerd_start_threads(peer);
956 #elif defined(ST_THREADS)
957         st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
958         r = st_thread_join(st, NULL);
959 #else
960         r = init_peerd_loop(peer);
961 #endif
962 out:
963         if (pid_fd > 0)
964                 pidfile_remove(pidfile, pid_fd);
965         return r;
966 }