peer: Add core dumping and backtrace in segfaults
[archipelago] / xseg / peers / user / peer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <sys/syscall.h>
41 #include <sys/time.h>
42 #include <sys/resource.h>
43 #include <signal.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <errno.h>
47 #include <sched.h>
48 #ifdef MT
49 #include <pthread.h>
50 #endif
51
52 #include <xseg/xseg.h>
53 #include <peer.h>
54
55 #ifdef MT
56 #ifdef ST_THREADS
57 #error "MT and ST_THREADS defines are mutually exclusive"
58 #endif
59 #endif
60
61 #ifdef MT
62 #define PEER_TYPE "pthread"
63 #else
64 #define PEER_TYPE "posix"
65 #endif
66
67 //FIXME this should not be defined here probably
68 #define MAX_SPEC_LEN 128
69 #define MAX_PIDFILE_LEN 512
70 #define MAX_CPUS_LEN 512
71
72 /* Define the cpus on which the threads/process will be pinned */
73 struct cpu_list {
74         int cpus[48];
75         int len;
76 };
77
78 struct cpu_list cpu_list;
79 volatile unsigned int terminated = 0;
80 unsigned int verbose = 0;
81 struct log_ctx lc;
82 #ifdef ST_THREADS
83 uint32_t ta = 0;
84 #endif
85
86 #ifdef MT
87 struct peerd *global_peer;
88 static pthread_key_t threadkey;
89
90 inline static int wake_up_next_thread(struct peerd *peer)
91 {
92         return (xseg_signal(peer->xseg, peer->portno_start));
93 }
94 #endif
95
96 /*
97  * extern is needed if this function is going to be called by another file
98  * such as bench-xseg.c
99  */
100
101 void signal_handler(int signal)
102 {
103 //      XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
104         terminated = 1;
105 #ifdef MT
106         wake_up_next_thread(global_peer);
107 #endif
108 }
109
110 /*
111  * We want to both print the backtrace and dump the core. To do so, we fork a
112  * process that prints its stack trace, that should be the same as the parents.
113  * Then we wait for 1 sec and we abort. The reason we don't abort immediately
114  * is because it may interrupt the printing of the backtrace.
115  */
116 void segv_handler(int signal)
117 {
118         if (fork() == 0) {
119                 xseg_printtrace();
120                 _exit(1);
121         }
122
123         sleep(1);
124         abort();
125 }
126
127 void renew_logfile(int signal)
128 {
129 //      XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
130         renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
131 }
132
133 static int setup_signals(struct peerd *peer)
134 {
135         int r;
136         struct sigaction sa;
137         struct rlimit rlim;
138 #ifdef MT
139         global_peer = peer;
140 #endif
141         sigemptyset(&sa.sa_mask);
142         sa.sa_flags = 0;
143         sa.sa_handler = signal_handler;
144         r = sigaction(SIGTERM, &sa, NULL);
145         if (r < 0)
146                 return r;
147         r = sigaction(SIGINT, &sa, NULL);
148         if (r < 0)
149                 return r;
150         r = sigaction(SIGQUIT, &sa, NULL);
151         if (r < 0)
152                 return r;
153
154         /*
155          * Get the current limits for core files and raise them to the largest
156          * value possible
157          */
158         if (getrlimit(RLIMIT_CORE, &rlim) < 0)
159                 return r;
160
161         rlim.rlim_cur = rlim.rlim_max;
162
163         if (setrlimit(RLIMIT_CORE, &rlim) < 0)
164                 return r;
165
166         /* Install handler for segfaults */
167         sa.sa_handler = segv_handler;
168         r = sigaction(SIGSEGV, &sa, NULL);
169         if (r < 0)
170                 return r;
171
172         sa.sa_handler = renew_logfile;
173         r = sigaction(SIGUSR1, &sa, NULL);
174         if (r < 0)
175                 return r;
176
177         return r;
178 }
179
180 inline int canDefer(struct peerd *peer)
181 {
182         return !(peer->defer_portno == NoPort);
183 }
184
185 void print_req(struct xseg *xseg, struct xseg_request *req)
186 {
187         char target[64], data[64];
188         char *req_target, *req_data;
189         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
190         req_target = xseg_get_target(xseg, req);
191         req_data = xseg_get_data(xseg, req);
192
193         if (1) {
194                 strncpy(target, req_target, end);
195                 target[end] = 0;
196                 strncpy(data, req_data, 63);
197                 data[63] = 0;
198                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
199                                 "src: %u, transit: %u, dst: %u effective dst: %u\n"
200                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
201                                 (unsigned long)(req),
202                                 (unsigned int)req->op,
203                                 (unsigned long long)req->offset,
204                                 (unsigned long)req->size,
205                                 (unsigned long)req->serviced,
206                                 (unsigned int)req->state,
207                                 (unsigned int)req->src_portno,
208                                 (unsigned int)req->transit_portno,
209                                 (unsigned int)req->dst_portno,
210                                 (unsigned int)req->effective_dst_portno,
211                                 (unsigned int)req->targetlen, target,
212                                 (unsigned long long)req->datalen, data);
213         }
214 }
215 void log_pr(char *msg, struct peer_req *pr)
216 {
217         char target[64], data[64];
218         char *req_target, *req_data;
219         struct peerd *peer = pr->peer;
220         struct xseg *xseg = pr->peer->xseg;
221         req_target = xseg_get_target(xseg, pr->req);
222         req_data = xseg_get_data(xseg, pr->req);
223         /* null terminate name in case of req->target is less than 63 characters,
224          * and next character after name (aka first byte of next buffer) is not
225          * null
226          */
227         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
228         if (verbose) {
229                 strncpy(target, req_target, end);
230                 target[end] = 0;
231                 strncpy(data, req_data, 63);
232                 data[63] = 0;
233                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
234                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
235                                 msg,
236                                 (unsigned int)(pr - peer->peer_reqs),
237                                 (unsigned int)pr->req->op,
238                                 (unsigned long long)pr->req->offset,
239                                 (unsigned long)pr->req->size,
240                                 (unsigned long)pr->req->serviced,
241                                 (unsigned long)pr->retval,
242                                 (unsigned int)pr->req->state,
243                                 (unsigned int)pr->req->targetlen, target,
244                                 (unsigned long long)pr->req->datalen, data);
245         }
246 }
247
248 #ifdef MT
249 inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
250 {
251         struct peer_req *pr;
252         xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
253         if (idx != Noneidx)
254                 goto out;
255
256         /* try to steal from another thread */
257         /*
258         int i;
259         struct thread *nt;
260         for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
261                 nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
262                 if (!xq_count(&nt->free_thread_reqs))
263                                 continue;
264                 idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
265                 if (idx != Noneidx)
266                         goto out;
267         }
268         */
269         return NULL;
270 out:
271         pr = peer->peer_reqs + idx;
272         pr->thread_no = t - peer->thread;
273         return pr;
274 }
275 #else
276 /*
277  * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
278  * queue. If a pointer from peer_reqs is popped, we are certain that the
279  * associated memory in peer_reqs is free to use
280  */
281 inline struct peer_req *alloc_peer_req(struct peerd *peer)
282 {
283         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
284         if (idx == Noneidx)
285                 return NULL;
286         return peer->peer_reqs + idx;
287 }
288 #endif
289
290 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
291 {
292         xqindex idx = pr - peer->peer_reqs;
293         pr->req = NULL;
294 #ifdef MT
295         struct thread *t = &peer->thread[pr->thread_no];
296         xq_append_head(&t->free_thread_reqs, idx, 1);
297 #else
298         xq_append_head(&peer->free_reqs, idx, 1);
299 #endif
300 }
301
302 /*
303  * Count all free reqs in peer.
304  * Racy, if multithreaded, but the sum should monotonicly increase when checked
305  * after a termination signal is catched.
306  */
307 int all_peer_reqs_free(struct peerd *peer)
308 {
309         uint32_t free_reqs = 0;
310 #ifdef MT
311         int i;
312         for (i = 0; i < peer->nr_threads; i++) {
313                 free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
314         }
315 #else
316         free_reqs = xq_count(&peer->free_reqs);
317 #endif
318         if (free_reqs == peer->nr_ops)
319                 return 1;
320         return 0;
321 }
322
323 struct timeval resp_start, resp_end, resp_accum = {0, 0};
324 uint64_t responds = 0;
325 void get_responds_stats(){
326                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
327                                 //(unsigned int)(t - peer->thread),
328                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
329 }
330
331 //FIXME error check
332 void fail(struct peerd *peer, struct peer_req *pr)
333 {
334         struct xseg_request *req = pr->req;
335         uint32_t p;
336         if (req){
337                 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
338                 req->state |= XS_FAILED;
339                 //xseg_set_req_data(peer->xseg, pr->req, NULL);
340                 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
341                 xseg_signal(peer->xseg, p);
342         }
343         free_peer_req(peer, pr);
344 #ifdef MT
345         wake_up_next_thread(peer);
346 #endif
347 }
348
349 //FIXME error check
350 void complete(struct peerd *peer, struct peer_req *pr)
351 {
352         struct xseg_request *req = pr->req;
353         uint32_t p;
354         if (req){
355                 req->state |= XS_SERVED;
356                 //xseg_set_req_data(peer->xseg, pr->req, NULL);
357                 //gettimeofday(&resp_start, NULL);
358                 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
359                 //gettimeofday(&resp_end, NULL);
360                 //responds++;
361                 //timersub(&resp_end, &resp_start, &resp_end);
362                 //timeradd(&resp_end, &resp_accum, &resp_accum);
363                 //printf("xseg_signal: %u\n", p);
364                 xseg_signal(peer->xseg, p);
365         }
366         free_peer_req(peer, pr);
367 #ifdef MT
368         wake_up_next_thread(peer);
369 #endif
370 }
371
372 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
373                                 struct xseg_request *req)
374 {
375         struct xseg_request *xreq = pr->req;
376         //assert xreq == req;
377         XSEGLOG2(&lc, D, "Handle accepted");
378         xreq->serviced = 0;
379         //xreq->state = XS_ACCEPTED;
380         pr->retval = 0;
381         dispatch(peer, pr, req, dispatch_accept);
382 }
383
384 static void handle_received(struct peerd *peer, struct peer_req *pr,
385                                 struct xseg_request *req)
386 {
387         //struct xseg_request *req = pr->req;
388         //assert req->state != XS_ACCEPTED;
389         XSEGLOG2(&lc, D, "Handle received \n");
390         dispatch(peer, pr, req, dispatch_receive);
391
392 }
393 struct timeval sub_start, sub_end, sub_accum = {0, 0};
394 uint64_t submits = 0;
395 void get_submits_stats(){
396                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
397                                 //(unsigned int)(t - peer->thread),
398                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
399 }
400
401 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
402 {
403         uint32_t ret;
404         struct xseg_request *req = pr->req;
405         // assert req->portno == peer->portno ?
406         //TODO small function with error checking
407         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
408         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
409         if (ret < 0)
410                 return -1;
411         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
412         //gettimeofday(&sub_start, NULL);
413         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
414         //gettimeofday(&sub_end, NULL);
415         //submits++;
416         //timersub(&sub_end, &sub_start, &sub_end);
417         //timeradd(&sub_end, &sub_accum, &sub_accum);
418         if (ret == NoPort)
419                 return -1;
420         xseg_signal(peer->xseg, ret);
421         return 0;
422 }
423
424 #ifdef MT
425 int check_ports(struct peerd *peer, struct thread *t)
426 #else
427 int check_ports(struct peerd *peer)
428 #endif
429 {
430         struct xseg *xseg = peer->xseg;
431         xport portno_start = peer->portno_start;
432         xport portno_end = peer->portno_end;
433         struct xseg_request *accepted, *received;
434         struct peer_req *pr;
435         xport i;
436         int  r, c = 0;
437
438         for (i = portno_start; i <= portno_end; i++) {
439                 accepted = NULL;
440                 received = NULL;
441                 if (!isTerminate()) {
442 #ifdef MT
443                         pr = alloc_peer_req(peer, t); 
444 #else
445                         pr = alloc_peer_req(peer);
446 #endif
447                         if (pr) {
448                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
449                                 if (accepted) {
450                                         pr->req = accepted;
451                                         pr->portno = i;
452                                         xseg_cancel_wait(xseg, i);
453                                         handle_accepted(peer, pr, accepted);
454                                         c = 1;
455                                 }
456                                 else {
457                                         free_peer_req(peer, pr);
458                                 }
459                         }
460                 }
461                 received = xseg_receive(xseg, i, X_NONBLOCK);
462                 if (received) {
463                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
464                         if (r < 0 || !pr){
465                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
466                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
467                                 if (p == NoPort){
468                                         XSEGLOG2(&lc, W, "Could not respond stale request");
469                                         xseg_put_request(xseg, received, portno_start);
470                                         continue;
471                                 } else {
472                                         xseg_signal(xseg, p);
473                                 }
474                         } else {
475                                 //maybe perform sanity check for pr
476                                 xseg_cancel_wait(xseg, i);
477                                 handle_received(peer, pr, received);
478                                 c = 1;
479                         }
480                 }
481         }
482
483         return c;
484 }
485
486 #ifdef MT
487 static void* thread_loop(void *arg)
488 {
489         struct thread *t = (struct thread *) arg;
490         struct peerd *peer = t->peer;
491         struct xseg *xseg = peer->xseg;
492         xport portno_start = peer->portno_start;
493         xport portno_end = peer->portno_end;
494         pid_t pid =syscall(SYS_gettid);
495         uint64_t loops;
496         uint64_t threshold=1000/(1 + portno_end - portno_start);
497
498         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
499         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
500         xseg_init_local_signal(xseg, peer->portno_start);
501         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
502                 for(loops =  threshold; loops > 0; loops--) {
503                         if (loops == 1)
504                                 xseg_prepare_wait(xseg, peer->portno_start);
505                         if (check_ports(peer, t))
506                                 loops = threshold;
507                 }
508                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
509                 xseg_wait_signal(xseg, 10000000UL);
510                 xseg_cancel_wait(xseg, peer->portno_start);
511                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
512         }
513         wake_up_next_thread(peer);
514         custom_peer_finalize(peer);
515         return NULL;
516 }
517
518 void *init_thread_loop(void *arg)
519 {
520         struct thread *t = (struct thread *) arg;
521         struct peerd *peer = t->peer;
522         pthread_t thread = pthread_self();
523         long int thread_num = t - t->peer->thread;
524         char *thread_id;
525         int i;
526
527         cpu_set_t mask;
528         int r;
529
530         if (cpu_list.len) {
531                 CPU_ZERO(&mask);
532                 CPU_SET(cpu_list.cpus[thread_num], &mask);
533                 r = pthread_setaffinity_np(thread, sizeof(mask), &mask);
534                 if (r < 0) {
535                         perror("sched_setaffinity");
536                         return NULL;
537                 }
538         }
539
540
541         /*
542          * We need an identifier for every thread that will spin in peerd_loop.
543          * The following code is a way to create a string of this format:
544          *              "Thread <num>"
545          * minus the null terminator. What we do is we create this string with
546          * snprintf and then resize it to exclude the null terminator with
547          * realloc. Finally, the result string is passed to the (void *arg) field
548          * of struct thread.
549          *
550          * Since the highest thread number can't be more than 5 digits, using 13
551          * chars should be more than enough.
552          */
553         thread_id = malloc(13 * sizeof(char));
554         snprintf(thread_id, 13, "Thread %ld", thread_num);
555         for (i = 0; thread_id[i]; i++) {}
556         t->arg = (void *)realloc(thread_id, i-1);
557         pthread_setspecific(threadkey, t);
558
559         //Start thread loop
560         (void)peer->peerd_loop(t);
561
562         wake_up_next_thread(peer);
563         custom_peer_finalize(peer);
564
565         return NULL;
566 }
567
568 int peerd_start_threads(struct peerd *peer)
569 {
570         int i;
571         uint32_t nr_threads = peer->nr_threads;
572         //TODO err check
573         for (i = 0; i < nr_threads; i++) {
574                 peer->thread[i].thread_no = i;
575                 peer->thread[i].peer = peer;
576                 pthread_create(&peer->thread[i].tid, NULL,
577                                         init_thread_loop, (void *)(peer->thread + i));
578         }
579
580         if (peer->interactive_func)
581                 peer->interactive_func();
582         for (i = 0; i < nr_threads; i++) {
583                 pthread_join(peer->thread[i].tid, NULL);
584         }
585
586         return 0;
587 }
588 #endif
589
590 int defer_request(struct peerd *peer, struct peer_req *pr)
591 {
592         int r;
593         xport p;
594         if (!canDefer(peer)){
595                 XSEGLOG2(&lc, E, "Peer cannot defer requests");
596                 return -1;
597         }
598         p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
599                         X_ALLOC);
600         if (p == NoPort){
601                 XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
602                 return -1;
603         }
604         r = xseg_signal(peer->xseg, p);
605         if (r < 0) {
606                 XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
607         }
608         free_peer_req(peer, pr);
609         return 0;
610 }
611
612 /*
613  * generic_peerd_loop is a general-purpose port-checker loop that is
614  * suitable both for multi-threaded and single-threaded peers.
615  */
616 static int generic_peerd_loop(void *arg)
617 {
618 #ifdef MT
619         struct thread *t = (struct thread *) arg;
620         struct peerd *peer = t->peer;
621         char *id = t->arg;
622 #else
623         struct peerd *peer = (struct peerd *) arg;
624         char id[4] = {'P','e','e','r'};
625 #endif
626         struct xseg *xseg = peer->xseg;
627         xport portno_start = peer->portno_start;
628         xport portno_end = peer->portno_end;
629         pid_t pid = syscall(SYS_gettid);
630         uint64_t threshold = peer->threshold;
631         threshold /= (1 + portno_end - portno_start);
632         threshold += 1;
633         uint64_t loops;
634
635         XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
636         xseg_init_local_signal(xseg, peer->portno_start);
637         //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
638         for (;!(isTerminate() && all_peer_reqs_free(peer));) {
639                 //Heart of peerd_loop. This loop is common for everyone.
640                 for(loops = threshold; loops > 0; loops--) {
641                         if (loops == 1)
642                                 xseg_prepare_wait(xseg, peer->portno_start);
643 #ifdef MT
644                         if (check_ports(peer, t))
645 #else
646                         if (check_ports(peer))
647 #endif
648                                 loops = threshold;
649                 }
650 #ifdef ST_THREADS
651                 if (ta){
652                         st_sleep(0);
653                         continue;
654                 }
655 #endif
656                 XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
657                 xseg_wait_signal(xseg, 10000000UL);
658                 xseg_cancel_wait(xseg, peer->portno_start);
659                 XSEGLOG2(&lc, I, "%s woke up\n", id);
660         }
661         return 0;
662 }
663
664 static int init_peerd_loop(struct peerd *peer)
665 {
666         struct xseg *xseg = peer->xseg;
667         cpu_set_t mask;
668         int r;
669
670         if (cpu_list.len) {
671                 CPU_ZERO(&mask);
672                 CPU_SET(cpu_list.cpus[0], &mask);
673
674                 r = sched_setaffinity(0, sizeof(mask), &mask);
675                 if (r < 0) {
676                         perror("sched_setaffinity");
677                         return -1;
678                 }
679         }
680
681         peer->peerd_loop(peer);
682         custom_peer_finalize(peer);
683         xseg_quit_local_signal(xseg, peer->portno_start);
684
685         return 0;
686 }
687
688 #ifdef ST_THREADS
689 static void * st_peerd_loop(void *peer)
690 {
691         struct peerd *peerd = peer;
692
693         return init_peerd_loop(peerd) ? (void *)-1 : (void *)0;
694 }
695 #endif
696
697 static struct xseg *join(char *spec)
698 {
699         struct xseg_config config;
700         struct xseg *xseg;
701
702         (void)xseg_parse_spec(spec, &config);
703         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
704         if (xseg)
705                 return xseg;
706
707         (void)xseg_create(&config);
708         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
709 }
710
711 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
712                         long portno_end, uint32_t nr_threads, xport defer_portno,
713                         uint64_t threshold)
714 {
715         int i;
716         struct peerd *peer;
717         struct xseg_port *port;
718         void *sd = NULL;
719         xport p;
720
721 #ifdef ST_THREADS
722         st_init();
723 #endif
724         peer = malloc(sizeof(struct peerd));
725         if (!peer) {
726                 perror("malloc");
727                 return NULL;
728         }
729         peer->nr_ops = nr_ops;
730         peer->defer_portno = defer_portno;
731         peer->threshold = threshold;
732 #ifdef MT
733         peer->nr_threads = nr_threads;
734         peer->thread = calloc(nr_threads, sizeof(struct thread));
735         if (!peer->thread)
736                 goto malloc_fail;
737         if (!xq_alloc_empty(&peer->threads, nr_threads))
738                 goto malloc_fail;
739         for (i = 0; i < nr_threads; i++) {
740                 if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
741                         goto malloc_fail;
742         }
743         for (i = 0; i < nr_ops; i++) {
744                 __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
745         }
746
747         pthread_key_create(&threadkey, NULL);
748 #else
749         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
750                 goto malloc_fail;
751         if (peer->free_reqs.size < peer->nr_ops)
752                 peer->nr_ops = peer->free_reqs.size;
753 #endif
754         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
755         if (!peer->peer_reqs){
756 malloc_fail:
757                 perror("malloc");
758                 return NULL;
759         }
760         if (xseg_initialize()){
761                 printf("cannot initialize library\n");
762                 return NULL;
763         }
764         peer->xseg = join(spec);
765         if (!peer->xseg)
766                 return NULL;
767
768         peer->portno_start = (xport) portno_start;
769         peer->portno_end= (xport) portno_end;
770
771         /*
772          * Start binding ports from portno_start to portno_end.
773          * The first port we bind will have its signal_desc initialized by xseg
774          * and the same signal_desc will be used for all the other ports.
775          */
776         for (p = peer->portno_start; p <= peer->portno_end; p++) {
777                 port = xseg_bind_port(peer->xseg, p, sd);
778                 if (!port){
779                         printf("cannot bind to port %u\n", (unsigned int) p);
780                         return NULL;
781                 }
782                 if (p == peer->portno_start)
783                         sd = xseg_get_signal_desc(peer->xseg, port);
784         }
785
786         printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
787
788         for (i = 0; i < nr_ops; i++) {
789                 peer->peer_reqs[i].peer = peer;
790                 peer->peer_reqs[i].req = NULL;
791                 peer->peer_reqs[i].retval = 0;
792                 peer->peer_reqs[i].priv = NULL;
793                 peer->peer_reqs[i].portno = NoPort;
794 #ifdef ST_THREADS
795                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
796 #endif
797         }
798
799         //Plug default peerd_loop. This can change later on by custom_peer_init.
800         peer->peerd_loop = generic_peerd_loop;
801
802 #ifdef MT
803         peer->interactive_func = NULL;
804 #endif
805         return peer;
806 }
807
808 int pidfile_remove(char *path, int fd)
809 {
810         close(fd);
811         return (unlink(path));
812 }
813
814 int pidfile_write(int pid_fd)
815 {
816         char buf[16];
817         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
818         buf[15] = 0;
819
820         lseek(pid_fd, 0, SEEK_SET);
821         int ret = write(pid_fd, buf, strlen(buf));
822         return ret;
823 }
824
825 int pidfile_read(char *path, pid_t *pid)
826 {
827         char buf[16], *endptr;
828         *pid = 0;
829
830         int fd = open(path, O_RDONLY);
831         if (fd < 0)
832                 return -1;
833         int ret = read(fd, buf, 15);
834         buf[15]=0;
835         close(fd);
836         if (ret < 0)
837                 return -1;
838         else{
839                 *pid = strtol(buf, &endptr, 10);
840                 if (endptr != &buf[ret]){
841                         *pid = 0;
842                         return -1;
843                 }
844         }
845         return 0;
846 }
847
848 int pidfile_open(char *path, pid_t *old_pid)
849 {
850         //nfs version > 3
851         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IWUSR);
852         if (fd < 0){
853                 if (errno == EEXIST)
854                         pidfile_read(path, old_pid);
855         }
856         return fd;
857 }
858
859 int get_cpu_list(char *cpus, struct cpu_list *cpu_list)
860 {
861         char *tok, *rem;
862         int i = 0;
863
864         while ((tok = strtok(cpus, ",")) != NULL) {
865                 cpu_list->cpus[i++] = strtol(tok, &rem, 10);
866                 if (strlen(rem) > 0)    /* Not a number */
867                         return -1;
868                 cpus = NULL;
869         }
870
871         cpu_list->len = i;
872
873         return 0;
874 }
875
876 void usage(char *argv0)
877 {
878         fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
879         fprintf(stderr, "General peer options:\n"
880                 "  Option      | Default | \n"
881                 "  --------------------------------------------\n"
882                 "    -g        | None    | Segment spec to join\n"
883                 "    -sp       | NoPort  | Start portno to bind\n"
884                 "    -ep       | NoPort  | End portno to bind\n"
885                 "    -p        | NoPort  | Portno to bind\n"
886                 "    -n        | 16      | Number of ops\n"
887                 "    -v        | 0       | Verbosity level\n"
888                 "    -l        | None    | Logfile \n"
889                 "    -d        | No      | Daemonize \n"
890                 "    --pidfile | None    | Pidfile \n"
891 #ifdef MT
892                 "    -t        | No      | Number of threads \n"
893 #endif
894                 "    --cpus    | No      | Coma-separated list of CPUs\n"
895                 "              |         | to pin the process or threads\n"
896                 "\n"
897                );
898         custom_peer_usage();
899 }
900
901 int main(int argc, char *argv[])
902 {
903         struct peerd *peer = NULL;
904         //parse args
905         int r;
906         long portno_start = -1, portno_end = -1, portno = -1;
907
908         //set defaults here
909         int daemonize = 0, help = 0;
910         uint32_t nr_ops = 16;
911         uint32_t nr_threads = 1;
912         uint64_t threshold = 1000;
913         unsigned int debug_level = 0;
914         xport defer_portno = NoPort;
915         pid_t old_pid;
916         int pid_fd = -1;
917
918         char spec[MAX_SPEC_LEN + 1];
919         char logfile[MAX_LOGFILE_LEN + 1];
920         char pidfile[MAX_PIDFILE_LEN + 1];
921         char cpus[MAX_CPUS_LEN + 1];
922
923         logfile[0] = 0;
924         pidfile[0] = 0;
925         spec[0] = 0;
926         cpus[0] = 0;
927
928         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
929         // -dp xseg_portno to defer blocking requests
930         // -l log file ?
931         //TODO print messages on arg parsing error
932         BEGIN_READ_ARGS(argc, argv);
933         READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
934         READ_ARG_ULONG("-sp", portno_start);
935         READ_ARG_ULONG("-ep", portno_end);
936         READ_ARG_ULONG("-p", portno);
937         READ_ARG_ULONG("-n", nr_ops);
938         READ_ARG_ULONG("-v", debug_level);
939 #ifdef MT
940         READ_ARG_ULONG("-t", nr_threads);
941 #endif
942         READ_ARG_ULONG("-dp", defer_portno);
943         READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
944         READ_ARG_BOOL("-d", daemonize);
945         READ_ARG_BOOL("-h", help);
946         READ_ARG_BOOL("--help", help);
947         READ_ARG_ULONG("--threshold", threshold);
948         READ_ARG_STRING("--cpus", cpus, MAX_CPUS_LEN);
949         READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
950         END_READ_ARGS();
951
952         if (help){
953                 usage(argv[0]);
954                 return 0;
955         }
956
957         r = init_logctx(&lc, argv[0], debug_level, logfile,
958                         REDIRECT_STDOUT|REDIRECT_STDERR);
959         if (r < 0){
960                 XSEGLOG("Cannot initialize logging to logfile");
961                 return -1;
962         }
963         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
964
965         if (pidfile[0]){
966                 pid_fd = pidfile_open(pidfile, &old_pid);
967                 if (pid_fd < 0) {
968                         if (old_pid) {
969                                 XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
970                         } else {
971                                 XSEGLOG2(&lc, E, "Cannot open or create pidfile");
972                         }
973                         return -1;
974                 }
975         }
976
977         if (daemonize){
978                 if (close(STDIN_FILENO)){
979                         XSEGLOG2(&lc, W, "Could not close stdin");
980                 }
981                 if (daemon(0, 1) < 0){
982                         XSEGLOG2(&lc, E, "Cannot daemonize");
983                         r = -1;
984                         goto out;
985                 }
986         }
987
988         pidfile_write(pid_fd);
989
990         if (cpus[0]) {
991                 r = get_cpu_list(cpus, &cpu_list);
992
993                 if (r < 0) {
994                         XSEGLOG2(&lc, E, "--cpus %s: Invalid input", cpus);
995                         goto out;
996                 }
997 #ifdef MT
998                 if (nr_threads != cpu_list.len) {
999                         XSEGLOG2(&lc, E, "--cpus %s: Number of "
1000                                         "threads (%d) and CPUs don't match",
1001                                         cpus, nr_threads);
1002                         goto out;
1003                 }
1004 #else
1005                 if (cpu_list.len != 1) {
1006                         XSEGLOG2(&lc, E, "--cpus %s: Too many CPUs for a "
1007                                         "single process", cpus);
1008                         goto out;
1009                 }
1010 #endif
1011         }
1012
1013
1014         //TODO perform argument sanity checks
1015         verbose = debug_level;
1016         if (portno != -1) {
1017                 portno_start = portno;
1018                 portno_end = portno;
1019         }
1020         if (portno_start == -1 || portno_end == -1){
1021                 XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
1022                 usage(argv[0]);
1023                 r = -1;
1024                 goto out;
1025         }
1026
1027         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno, threshold);
1028         if (!peer){
1029                 r = -1;
1030                 goto out;
1031         }
1032         setup_signals(peer);
1033         r = custom_peer_init(peer, argc, argv);
1034         if (r < 0)
1035                 goto out;
1036 #if defined(MT)
1037         //TODO err check
1038         peerd_start_threads(peer);
1039 #elif defined(ST_THREADS)
1040         st_thread_t st = st_thread_create(st_peerd_loop, peer, 1, 0);
1041         r = st_thread_join(st, NULL);
1042 #else
1043         r = init_peerd_loop(peer);
1044 #endif
1045 out:
1046         if (pid_fd > 0)
1047                 pidfile_remove(pidfile, pid_fd);
1048         return r;
1049 }