Split peer request queues.
[archipelago] / xseg / peers / user / peer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/time.h>
42 #include <signal.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46 #ifdef MT
47 #include <pthread.h>
48 #endif
49
50 #include <xseg/xseg.h>
51 #include <peer.h>
52
53 #ifdef MT
54 #define PEER_TYPE "pthread"
55 #else
56 #define PEER_TYPE "posix"
57 #endif
58
59 //FIXME this should not be defined here probably
60 #define MAX_SPEC_LEN 128
61 #define MAX_PIDFILE_LEN 512
62
63 volatile unsigned int terminated = 0;
64 unsigned int verbose = 0;
65 struct log_ctx lc;
66 #ifdef ST_THREADS
67 uint32_t ta = 0;
68 #endif
69
70 #ifdef MT
71 struct peerd *global_peer;
72
73 /*
74 struct thread {
75         struct peerd *peer;
76         pthread_t tid;
77         pthread_cond_t cond;
78         pthread_mutex_t lock;
79         int thread_no;
80         void (*func)(void *arg);
81         void *arg;
82         void *priv;
83         struct xq free_thread_reqs;
84 };
85 */
86 inline static struct thread* alloc_thread(struct peerd *peer)
87 {
88         xqindex idx = xq_pop_head(&peer->threads, 1);
89         if (idx == Noneidx)
90                 return NULL;
91         return peer->thread + idx;
92 }
93
94 inline static void free_thread(struct peerd *peer, struct thread *t)
95 {
96         xqindex idx = t - peer->thread;
97         xq_append_head(&peer->threads, idx, 1);
98 }
99
100
101 inline static void __wake_up_thread(struct thread *t)
102 {
103         pthread_mutex_lock(&t->lock);
104         pthread_cond_signal(&t->cond);
105         pthread_mutex_unlock(&t->lock);
106 }
107
108 inline static void wake_up_thread(struct thread* t)
109 {
110         if (t){
111                 __wake_up_thread(t);
112         }
113 }
114
115 inline static int wake_up_next_thread(struct peerd *peer)
116 {
117         return (xseg_signal(peer->xseg, peer->portno_start));
118 }
119 #endif
120
121 /*
122  * extern is needed if this function is going to be called by another file
123  * such as bench-xseg.c
124  */
125
126 void signal_handler(int signal)
127 {
128         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
129         terminated = 1;
130 #ifdef MT
131         wake_up_next_thread(global_peer);
132 #endif
133 }
134
135 void renew_logfile(int signal)
136 {
137         XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
138         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
139 }
140
141 static int setup_signals(struct peerd *peer)
142 {
143         int r;
144         struct sigaction sa;
145 #ifdef MT
146         global_peer = peer;
147 #endif
148         sigemptyset(&sa.sa_mask);
149         sa.sa_flags = 0;
150         sa.sa_handler = signal_handler;
151         r = sigaction(SIGTERM, &sa, NULL);
152         if (r < 0)
153                 return r;
154         r = sigaction(SIGINT, &sa, NULL);
155         if (r < 0)
156                 return r;
157         r = sigaction(SIGQUIT, &sa, NULL);
158         if (r < 0)
159                 return r;
160
161         sa.sa_handler = renew_logfile;
162         r = sigaction(SIGUSR1, &sa, NULL);
163         if (r < 0)
164                 return r;
165
166         return r;
167 }
168
169 inline int canDefer(struct peerd *peer)
170 {
171         return !(peer->defer_portno == NoPort);
172 }
173
174 void print_req(struct xseg *xseg, struct xseg_request *req)
175 {
176         char target[64], data[64];
177         char *req_target, *req_data;
178         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
179         req_target = xseg_get_target(xseg, req);
180         req_data = xseg_get_data(xseg, req);
181
182         if (1) {
183                 strncpy(target, req_target, end);
184                 target[end] = 0;
185                 strncpy(data, req_data, 63);
186                 data[63] = 0;
187                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
188                                 "src: %u, transit: %u, dst: %u effective dst: %u\n"
189                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
190                                 (unsigned long)(req),
191                                 (unsigned int)req->op,
192                                 (unsigned long long)req->offset,
193                                 (unsigned long)req->size,
194                                 (unsigned long)req->serviced,
195                                 (unsigned int)req->state,
196                                 (unsigned int)req->src_portno,
197                                 (unsigned int)req->transit_portno,
198                                 (unsigned int)req->dst_portno,
199                                 (unsigned int)req->effective_dst_portno,
200                                 (unsigned int)req->targetlen, target,
201                                 (unsigned long long)req->datalen, data);
202         }
203 }
204 void log_pr(char *msg, struct peer_req *pr)
205 {
206         char target[64], data[64];
207         char *req_target, *req_data;
208         struct peerd *peer = pr->peer;
209         struct xseg *xseg = pr->peer->xseg;
210         req_target = xseg_get_target(xseg, pr->req);
211         req_data = xseg_get_data(xseg, pr->req);
212         /* null terminate name in case of req->target is less than 63 characters,
213          * and next character after name (aka first byte of next buffer) is not
214          * null
215          */
216         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
217         if (verbose) {
218                 strncpy(target, req_target, end);
219                 target[end] = 0;
220                 strncpy(data, req_data, 63);
221                 data[63] = 0;
222                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
223                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
224                                 msg,
225                                 (unsigned int)(pr - peer->peer_reqs),
226                                 (unsigned int)pr->req->op,
227                                 (unsigned long long)pr->req->offset,
228                                 (unsigned long)pr->req->size,
229                                 (unsigned long)pr->req->serviced,
230                                 (unsigned long)pr->retval,
231                                 (unsigned int)pr->req->state,
232                                 (unsigned int)pr->req->targetlen, target,
233                                 (unsigned long long)pr->req->datalen, data);
234         }
235 }
236
237 #ifdef MT
238 inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
239 {
240         struct peer_req *pr;
241         struct thread *nt;
242         xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
243         if (idx != Noneidx)
244                 goto out;
245         /* try to steal from the next thread */
246         nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads];
247         idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
248         if (idx == Noneidx)
249                 return NULL;
250 out:
251         pr = peer->peer_reqs + idx;
252         pr->thread_no = t - peer->thread;
253         return pr;
254 }
255 #else
256 /*
257  * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
258  * queue. If a pointer from peer_reqs is popped, we are certain that the
259  * associated memory in peer_reqs is free to use
260  */
261 inline struct peer_req *alloc_peer_req(struct peerd *peer)
262 {
263         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
264         if (idx == Noneidx)
265                 return NULL;
266         return peer->peer_reqs + idx;
267 }
268 #endif
269
270 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
271 {
272         xqindex idx = pr - peer->peer_reqs;
273         pr->req = NULL;
274 #ifdef MT
275         struct thread *t = &peer->thread[pr->thread_no];
276         xq_append_head(&t->free_thread_reqs, idx, 1);
277 #else
278         xq_append_head(&peer->free_reqs, idx, 1);
279 #endif
280 }
281
282 struct timeval resp_start, resp_end, resp_accum = {0, 0};
283 uint64_t responds = 0;
284 void get_responds_stats(){
285                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
286                                 //(unsigned int)(t - peer->thread),
287                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
288 }
289
290 //FIXME error check
291 void fail(struct peerd *peer, struct peer_req *pr)
292 {
293         struct xseg_request *req = pr->req;
294         uint32_t p;
295         if (req){
296                 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
297                 req->state |= XS_FAILED;
298                 //xseg_set_req_data(peer->xseg, pr->req, NULL);
299                 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
300                 xseg_signal(peer->xseg, p);
301         }
302         free_peer_req(peer, pr);
303 #ifdef MT
304         wake_up_next_thread(peer);
305 #endif
306 }
307
308 //FIXME error check
309 void complete(struct peerd *peer, struct peer_req *pr)
310 {
311         struct xseg_request *req = pr->req;
312         uint32_t p;
313         if (req){
314                 req->state |= XS_SERVED;
315                 //xseg_set_req_data(peer->xseg, pr->req, NULL);
316                 //gettimeofday(&resp_start, NULL);
317                 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
318                 //gettimeofday(&resp_end, NULL);
319                 //responds++;
320                 //timersub(&resp_end, &resp_start, &resp_end);
321                 //timeradd(&resp_end, &resp_accum, &resp_accum);
322                 //printf("xseg_signal: %u\n", p);
323                 xseg_signal(peer->xseg, p);
324         }
325         free_peer_req(peer, pr);
326 #ifdef MT
327         wake_up_next_thread(peer);
328 #endif
329 }
330
331 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
332                                 struct xseg_request *req)
333 {
334         struct xseg_request *xreq = pr->req;
335         //assert xreq == req;
336         XSEGLOG2(&lc, D, "Handle accepted");
337         xreq->serviced = 0;
338         //xreq->state = XS_ACCEPTED;
339         pr->retval = 0;
340         dispatch(peer, pr, req, dispatch_accept);
341 }
342
343 static void handle_received(struct peerd *peer, struct peer_req *pr,
344                                 struct xseg_request *req)
345 {
346         //struct xseg_request *req = pr->req;
347         //assert req->state != XS_ACCEPTED;
348         XSEGLOG2(&lc, D, "Handle received \n");
349         dispatch(peer, pr, req, dispatch_receive);
350
351 }
352 struct timeval sub_start, sub_end, sub_accum = {0, 0};
353 uint64_t submits = 0;
354 void get_submits_stats(){
355                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
356                                 //(unsigned int)(t - peer->thread),
357                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
358 }
359
360 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
361 {
362         uint32_t ret;
363         struct xseg_request *req = pr->req;
364         // assert req->portno == peer->portno ?
365         //TODO small function with error checking
366         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
367         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
368         if (ret < 0)
369                 return -1;
370         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
371         //gettimeofday(&sub_start, NULL);
372         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
373         //gettimeofday(&sub_end, NULL);
374         //submits++;
375         //timersub(&sub_end, &sub_start, &sub_end);
376         //timeradd(&sub_end, &sub_accum, &sub_accum);
377         if (ret == NoPort)
378                 return -1;
379         xseg_signal(peer->xseg, ret);
380         return 0;
381 }
382
383 #ifdef MT
384 int check_ports(struct peerd *peer, struct thread *t)
385 #else
386 int check_ports(struct peerd *peer)
387 #endif
388 {
389         struct xseg *xseg = peer->xseg;
390         xport portno_start = peer->portno_start;
391         xport portno_end = peer->portno_end;
392         struct xseg_request *accepted, *received;
393         struct peer_req *pr;
394         xport i;
395         int  r, c = 0;
396
397         for (i = portno_start; i <= portno_end; i++) {
398                 accepted = NULL;
399                 received = NULL;
400                 //Shouldn't we just leave?
401                 if (!isTerminate()) {
402                         //Better way than alloc/free all the time?
403                         //Cache the allocated peer_req?
404 #ifdef MT
405                         pr = alloc_peer_req(peer, t); 
406 #else
407                         pr = alloc_peer_req(peer);
408 #endif
409                         if (pr) {
410                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
411                                 if (accepted) {
412                                         pr->req = accepted;
413                                         pr->portno = i;
414                                         xseg_cancel_wait(xseg, i);
415                                         handle_accepted(peer, pr, accepted);
416                                         c = 1;
417                                 }
418                                 else {
419                                         free_peer_req(peer, pr);
420                                 }
421                         }
422                 }
423                 received = xseg_receive(xseg, i, X_NONBLOCK);
424                 if (received) {
425                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
426                         if (r < 0 || !pr){
427                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
428                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
429                                 if (p == NoPort){
430                                         XSEGLOG2(&lc, W, "Could not respond stale request");
431                                         xseg_put_request(xseg, received, portno_start);
432                                         continue;
433                                 } else {
434                                         xseg_signal(xseg, p);
435                                 }
436                         } else {
437                                 //maybe perform sanity check for pr
438                                 xseg_cancel_wait(xseg, i);
439                                 handle_received(peer, pr, received);
440                                 c = 1;
441                         }
442                 }
443         }
444
445         return c;
446 }
447
448 #ifdef MT
449 static void* thread_loop(void *arg)
450 {
451         struct thread *t = (struct thread *) arg;
452         struct peerd *peer = t->peer;
453         struct xseg *xseg = peer->xseg;
454         xport portno_start = peer->portno_start;
455         xport portno_end = peer->portno_end;
456         pid_t pid =syscall(SYS_gettid);
457         uint64_t loops;
458         uint64_t threshold=1000/(1 + portno_end - portno_start);
459
460         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
461         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
462         xseg_init_local_signal(xseg, peer->portno_start);
463         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
464                 for(loops =  threshold; loops > 0; loops--) {
465                         if (loops == 1)
466                                 xseg_prepare_wait(xseg, peer->portno_start);
467                         if (check_ports(peer, t))
468                                 loops = threshold;
469                 }
470                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
471                 xseg_wait_signal(xseg, 10000000UL);
472                 xseg_cancel_wait(xseg, peer->portno_start);
473                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
474         }
475         wake_up_next_thread(peer);
476         custom_peer_finalize(peer);
477         return NULL;
478 }
479
480 void *init_thread_loop(void *arg)
481 {
482         struct thread *t = (struct thread *) arg;
483         struct peerd *peer = t->peer;
484         char *thread_id;
485         int i;
486
487         /*
488          * We need an identifier for every thread that will spin in peerd_loop.
489          * The following code is a way to create a string of this format:
490          *              "Thread <num>"
491          * minus the null terminator. What we do is we create this string with
492          * snprintf and then resize it to exclude the null terminator with
493          * realloc. Finally, the result string is passed to the (void *arg) field
494          * of struct thread.
495          *
496          * Since the highest thread number can't be more than 5 digits, using 13
497          * chars should be more than enough.
498          */
499         thread_id = malloc(13 * sizeof(char));
500         snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
501         for (i = 0; thread_id[i]; i++) {}
502         t->arg = (void *)realloc(thread_id, i-1);
503
504         //Start thread loop
505         (void)peer->peerd_loop(t);
506
507         wake_up_next_thread(peer);
508         custom_peer_finalize(peer);
509
510         return NULL;
511 }
512
513 int peerd_start_threads(struct peerd *peer)
514 {
515         int i;
516         uint32_t nr_threads = peer->nr_threads;
517         //TODO err check
518         for (i = 0; i < nr_threads; i++) {
519                 peer->thread[i].func = NULL;
520                 peer->thread[i].arg = NULL;
521                 peer->thread[i].peer = peer;
522                 pthread_cond_init(&peer->thread[i].cond,NULL);
523                 pthread_mutex_init(&peer->thread[i].lock, NULL);
524                 pthread_create(&peer->thread[i].tid, NULL,
525                                         init_thread_loop, (void *)(peer->thread + i));
526         }
527
528         if (peer->interactive_func)
529                 peer->interactive_func();
530         for (i = 0; i < nr_threads; i++) {
531                 pthread_join(peer->thread[i].tid, NULL);
532         }
533
534         return 0;
535 }
536 #endif
537
538 int defer_request(struct peerd *peer, struct peer_req *pr)
539 {
540         int r;
541         xport p;
542         if (!canDefer(peer)){
543                 XSEGLOG2(&lc, E, "Peer cannot defer requests");
544                 return -1;
545         }
546         p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
547                         X_ALLOC);
548         if (p == NoPort){
549                 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
550                 return -1;
551         }
552         r = xseg_signal(peer->xseg, p);
553         if (r < 0) {
554                 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
555         }
556         free_peer_req(peer, pr);
557         return 0;
558 }
559
560 /*
561  * generic_peerd_loop is a general-purpose port-checker loop that is
562  * suitable both for multi-threaded and single-threaded peers.
563  */
564 static int generic_peerd_loop(void *arg)
565 {
566 #ifdef MT
567         struct thread *t = (struct thread *) arg;
568         struct peerd *peer = t->peer;
569         char *id = t->arg;
570 #else
571         struct peerd *peer = (struct peerd *) arg;
572         char id[4] = {'P','e','e','r'};
573 #endif
574         struct xseg *xseg = peer->xseg;
575         xport portno_start = peer->portno_start;
576         xport portno_end = peer->portno_end;
577         pid_t pid = syscall(SYS_gettid);
578         uint64_t threshold=1000/(1 + portno_end - portno_start);
579         uint64_t loops;
580
581         XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
582         xseg_init_local_signal(xseg, peer->portno_start);
583         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
584 #ifdef MT
585                 if (t->func) {
586                         XSEGLOG2(&lc, D, "%s executes function\n", id);
587                         xseg_cancel_wait(xseg, peer->portno_start);
588                         t->func(t->arg);
589                         t->func = NULL;
590                         t->arg = NULL;
591                         continue;
592                 }
593 #endif
594                 //Heart of peerd_loop. This loop is common for everyone.
595                 for(loops = threshold; loops > 0; loops--) {
596                         if (loops == 1)
597                                 xseg_prepare_wait(xseg, peer->portno_start);
598 #ifdef MT
599                         if (check_ports(peer, t))
600 #else
601                         if (check_ports(peer))
602 #endif
603                                 loops = threshold;
604                 }
605 #ifdef ST_THREADS
606                 if (ta){
607                         st_sleep(0);
608                         continue;
609                 }
610 #endif
611                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
612                 xseg_wait_signal(xseg, 10000000UL);
613                 xseg_cancel_wait(xseg, peer->portno_start);
614                 XSEGLOG2(&lc, I, "%s woke up\n", id);
615         }
616         return 0;
617 }
618
619 static int init_peerd_loop(struct peerd *peer)
620 {
621         struct xseg *xseg = peer->xseg;
622
623         peer->peerd_loop(peer);
624         custom_peer_finalize(peer);
625         xseg_quit_local_signal(xseg, peer->portno_start);
626
627         return 0;
628 }
629
630 static struct xseg *join(char *spec)
631 {
632         struct xseg_config config;
633         struct xseg *xseg;
634
635         (void)xseg_parse_spec(spec, &config);
636         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
637         if (xseg)
638                 return xseg;
639
640         (void)xseg_create(&config);
641         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
642 }
643
644 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
645                         long portno_end, uint32_t nr_threads, xport defer_portno)
646 {
647         int i;
648         struct peerd *peer;
649         struct xseg_port *port;
650         void *sd = NULL;
651         xport p;
652
653 #ifdef ST_THREADS
654         st_init();
655 #endif
656         peer = malloc(sizeof(struct peerd));
657         if (!peer) {
658                 perror("malloc");
659                 return NULL;
660         }
661         peer->nr_ops = nr_ops;
662         peer->defer_portno = defer_portno;
663 #ifdef MT
664         peer->nr_threads = nr_threads;
665         peer->thread = calloc(nr_threads, sizeof(struct thread));
666         if (!peer->thread)
667                 goto malloc_fail;
668         if (!xq_alloc_empty(&peer->threads, nr_threads))
669                 goto malloc_fail;
670         for (i = 0; i < nr_threads; i++) {
671                 peer->thread[i].thread_no = i;
672                 if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
673                         goto malloc_fail;
674         }
675         for (i = 0; i < nr_ops; i++) {
676                 __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
677         }
678
679 #else
680         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
681                 goto malloc_fail;
682 #endif
683         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
684         if (!peer->peer_reqs){
685 malloc_fail:
686                 perror("malloc");
687                 return NULL;
688         }
689         if (xseg_initialize()){
690                 printf("cannot initialize library\n");
691                 return NULL;
692         }
693         peer->xseg = join(spec);
694         if (!peer->xseg)
695                 return NULL;
696
697         peer->portno_start = (xport) portno_start;
698         peer->portno_end= (xport) portno_end;
699
700         /*
701          * Start binding ports from portno_start to portno_end.
702          * The first port we bind will have its signal_desc initialized by xseg
703          * and the same signal_desc will be used for all the other ports.
704          */
705         for (p = peer->portno_start; p <= peer->portno_end; p++) {
706                 port = xseg_bind_port(peer->xseg, p, sd);
707                 if (!port){
708                         printf("cannot bind to port %u\n", (unsigned int) p);
709                         return NULL;
710                 }
711                 if (p == peer->portno_start)
712                         sd = xseg_get_signal_desc(peer->xseg, port);
713         }
714
715         printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
716
717         for (i = 0; i < nr_ops; i++) {
718                 peer->peer_reqs[i].peer = peer;
719                 peer->peer_reqs[i].req = NULL;
720                 peer->peer_reqs[i].retval = 0;
721                 peer->peer_reqs[i].priv = NULL;
722                 peer->peer_reqs[i].portno = NoPort;
723
724         //Plug default peerd_loop. This can change later on by custom_peer_init.
725         peer->peerd_loop = generic_peerd_loop;
726
727 #ifdef ST_THREADS
728                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
729 #endif
730         }
731 #ifdef MT
732         peer->interactive_func = NULL;
733 #endif
734         return peer;
735
736 }
737
738 int pidfile_remove(char *path, int fd)
739 {
740         close(fd);
741         return (unlink(path));
742 }
743
744 int pidfile_write(int pid_fd)
745 {
746         char buf[16];
747         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
748         buf[15] = 0;
749
750         lseek(pid_fd, 0, SEEK_SET);
751         int ret = write(pid_fd, buf, strlen(buf));
752         return ret;
753 }
754
755 int pidfile_read(char *path, pid_t *pid)
756 {
757         char buf[16], *endptr;
758         *pid = 0;
759
760         int fd = open(path, O_RDONLY);
761         if (fd < 0)
762                 return -1;
763         int ret = read(fd, buf, 15);
764         buf[15]=0;
765         close(fd);
766         if (ret < 0)
767                 return -1;
768         else{
769                 *pid = strtol(buf, &endptr, 10);
770                 if (endptr != &buf[ret]){
771                         *pid = 0;
772                         return -1;
773                 }
774         }
775         return 0;
776 }
777
778 int pidfile_open(char *path, pid_t *old_pid)
779 {
780         //nfs version > 3
781         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
782         if (fd < 0){
783                 if (errno == EEXIST)
784                         pidfile_read(path, old_pid);
785         }
786         return fd;
787 }
788
789 void usage(char *argv0)
790 {
791         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
792         fprintf(stderr, "General peer options:\n"
793                 "  Option      | Default | \n"
794                 "  --------------------------------------------\n"
795                 "    -g        | None    | Segment spec to join\n"
796                 "    -sp       | NoPort  | Start portno to bind\n"
797                 "    -ep       | NoPort  | End portno to bind\n"
798                 "    -p        | NoPort  | Portno to bind\n"
799                 "    -n        | 16      | Number of ops\n"
800                 "    -v        | 0       | Verbosity level\n"
801                 "    -l        | None    | Logfile \n"
802                 "    -d        | No      | Daemonize \n"
803                 "    --pidfile | None    | Pidfile \n"
804 #ifdef MT
805                 "    -t        | No      | Number of threads \n"
806 #endif
807                 "\n"
808                );
809         custom_peer_usage();
810 }
811
812 int main(int argc, char *argv[])
813 {
814         struct peerd *peer = NULL;
815         //parse args
816         int r;
817         long portno_start = -1, portno_end = -1, portno = -1;
818
819         //set defaults here
820         int daemonize = 0, help = 0;
821         uint32_t nr_ops = 16;
822         uint32_t nr_threads = 1;
823         unsigned int debug_level = 0;
824         xport defer_portno = NoPort;
825         pid_t old_pid;
826         int pid_fd = -1;
827
828         char spec[MAX_SPEC_LEN + 1];
829         char logfile[MAX_LOGFILE_LEN + 1];
830         char pidfile[MAX_PIDFILE_LEN + 1];
831
832         logfile[0] = 0;
833         pidfile[0] = 0;
834         spec[0] = 0;
835
836         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
837         // -dp xseg_portno to defer blocking requests
838         // -l log file ?
839         //TODO print messages on arg parsing error
840         BEGIN_READ_ARGS(argc, argv);
841         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
842         READ_ARG_ULONG("-sp", portno_start);
843         READ_ARG_ULONG("-ep", portno_end);
844         READ_ARG_ULONG("-p", portno);
845         READ_ARG_ULONG("-n", nr_ops);
846         READ_ARG_ULONG("-v", debug_level);
847 #ifdef MT
848         READ_ARG_ULONG("-t", nr_threads);
849 #endif
850         READ_ARG_ULONG("-dp", defer_portno);
851         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
852         READ_ARG_BOOL("-d", daemonize);
853         READ_ARG_BOOL("-h", help);
854         READ_ARG_BOOL("--help", help);
855         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
856         END_READ_ARGS();
857
858         if (help){
859                 usage(argv[0]);
860                 return 0;
861         }
862
863         r = init_logctx(&lc, argv[0], debug_level, logfile,
864                         REDIRECT_STDOUT|REDIRECT_STDERR);
865         if (r < 0){
866                 XSEGLOG("Cannot initialize logging to logfile");
867                 return -1;
868         }
869         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
870
871         if (pidfile[0]){
872                 pid_fd = pidfile_open(pidfile, &old_pid);
873                 if (pid_fd < 0) {
874                         if (old_pid) {
875                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
876                         } else {
877                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
878                         }
879                         return -1;
880                 }
881         }
882
883         if (daemonize){
884                 if (daemon(0, 1) < 0){
885                         XSEGLOG2(&lc, E, "Cannot daemonize");
886                         r = -1;
887                         goto out;
888                 }
889         }
890
891         pidfile_write(pid_fd);
892
893         //TODO perform argument sanity checks
894         verbose = debug_level;
895         if (portno != -1) {
896                 portno_start = portno;
897                 portno_end = portno;
898         }
899         if (portno_start == -1 || portno_end == -1){
900                 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
901                 usage(argv[0]);
902                 r = -1;
903                 goto out;
904         }
905
906         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
907         if (!peer){
908                 r = -1;
909                 goto out;
910         }
911         setup_signals(peer);
912         r = custom_peer_init(peer, argc, argv);
913         if (r < 0)
914                 goto out;
915 #if defined(MT)
916         //TODO err check
917         peerd_start_threads(peer);
918 #elif defined(ST_THREADS)
919         st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
920         r = st_thread_join(st, NULL);
921 #else
922         r = init_peerd_loop(peer);
923 #endif
924 out:
925         if (pid_fd > 0)
926                 pidfile_remove(pidfile, pid_fd);
927         return r;
928 }