extend gracefull exit to SIGINT, SIGQUIT
[archipelago] / xseg / peers / user / peer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <peer.h>
8 #include <sys/syscall.h>
9 #include <sys/time.h>
10 #include <signal.h>
11 #ifdef MT
12 #include <pthread.h>
13 #endif
14
15 #ifdef MT
16 #define PEER_TYPE "pthread"
17 #else
18 #define PEER_TYPE "posix"
19 #endif
20
21 unsigned int terminated = 0;
22 unsigned int verbose = 0;
23 struct log_ctx lc;
24 #ifdef ST_THREADS
25 uint32_t ta = 0;
26 #endif
27
28 #ifdef MT
29 struct thread {
30         struct peerd *peer;
31         pthread_t tid;
32         pthread_cond_t cond;
33         pthread_mutex_t lock;
34         void (*func)(void *arg);
35         void *arg;
36 };
37
38
39 inline static struct thread* alloc_thread(struct peerd *peer)
40 {
41         xqindex idx = xq_pop_head(&peer->threads, 1);
42         if (idx == Noneidx)
43                 return NULL;
44         return peer->thread + idx;
45 }
46
47 inline static void free_thread(struct peerd *peer, struct thread *t)
48 {
49         xqindex idx = t - peer->thread;
50         xq_append_head(&peer->threads, idx, 1);
51 }
52
53
54 inline static void __wake_up_thread(struct thread *t)
55 {
56         pthread_mutex_lock(&t->lock);
57         pthread_cond_signal(&t->cond);
58         pthread_mutex_unlock(&t->lock);
59 }
60
61 inline static void wake_up_thread(struct thread* t)
62 {
63         if (t){
64                 __wake_up_thread(t);
65         }
66 }
67
68 inline static int wake_up_next_thread(struct peerd *peer)
69 {
70         return (xseg_signal(peer->xseg, peer->portno_start));
71 }
72 #endif
73
74
75 static inline int isTerminate()
76 {
77 /* ta doesn't need to be taken into account, because the main loops
78  * doesn't check the terminated flag if ta is not 0.
79  *
80 #ifdef ST_THREADS
81         return (!ta & terminated);
82 #else
83         return terminated;
84 #endif
85         */
86         return terminated;
87 }
88
89 void signal_handler(int signal)
90 {
91         XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
92         terminated = 1;
93 }
94
95 static int setup_signals()
96 {
97         int r;
98         struct sigaction sa;
99         sigemptyset(&sa.sa_mask);
100         sa.sa_flags = 0;
101         sa.sa_handler = signal_handler;
102         r = sigaction(SIGTERM, &sa, NULL);
103         if (r < 0)
104                 return r;
105         r = sigaction(SIGINT, &sa, NULL);
106         if (r < 0)
107                 return r;
108         r = sigaction(SIGQUIT, &sa, NULL);
109         if (r < 0)
110                 return r;
111         return r;
112 }
113
114 inline int canDefer(struct peerd *peer)
115 {
116         return !(peer->defer_portno == NoPort);
117 }
118
119 void print_req(struct xseg *xseg, struct xseg_request *req)
120 {
121         char target[64], data[64];
122         char *req_target, *req_data;
123         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
124         req_target = xseg_get_target(xseg, req);
125         req_data = xseg_get_data(xseg, req);
126
127         if (1) {
128                 strncpy(target, req_target, end);
129                 target[end] = 0;
130                 strncpy(data, req_data, 63);
131                 data[63] = 0;
132                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
133                                 "src: %u, st: %u, dst: %u dt: %u\n"
134                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
135                                 (unsigned long)(req),
136                                 (unsigned int)req->op,
137                                 (unsigned long long)req->offset,
138                                 (unsigned long)req->size,
139                                 (unsigned long)req->serviced,
140                                 (unsigned int)req->state,
141                                 (unsigned int)req->src_portno,
142                                 (unsigned int)req->src_transit_portno,
143                                 (unsigned int)req->dst_portno,
144                                 (unsigned int)req->dst_transit_portno,
145                                 (unsigned int)req->targetlen, target,
146                                 (unsigned long long)req->datalen, data);
147         }
148 }
149 void log_pr(char *msg, struct peer_req *pr)
150 {
151         char target[64], data[64];
152         char *req_target, *req_data;
153         struct peerd *peer = pr->peer;
154         struct xseg *xseg = pr->peer->xseg;
155         req_target = xseg_get_target(xseg, pr->req);
156         req_data = xseg_get_data(xseg, pr->req);
157         /* null terminate name in case of req->target is less than 63 characters,
158          * and next character after name (aka first byte of next buffer) is not
159          * null
160          */
161         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
162         if (verbose) {
163                 strncpy(target, req_target, end);
164                 target[end] = 0;
165                 strncpy(data, req_data, 63);
166                 data[63] = 0;
167                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
168                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
169                                 msg,
170                                 (unsigned int)(pr - peer->peer_reqs),
171                                 (unsigned int)pr->req->op,
172                                 (unsigned long long)pr->req->offset,
173                                 (unsigned long)pr->req->size,
174                                 (unsigned long)pr->req->serviced,
175                                 (unsigned long)pr->retval,
176                                 (unsigned int)pr->req->state,
177                                 (unsigned int)pr->req->targetlen, target,
178                                 (unsigned long long)pr->req->datalen, data);
179         }
180 }
181
182 inline struct peer_req *alloc_peer_req(struct peerd *peer)
183 {
184         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
185         if (idx == Noneidx)
186                 return NULL;
187         return peer->peer_reqs + idx;
188 }
189
190 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
191 {
192         xqindex idx = pr - peer->peer_reqs;
193         pr->req = NULL;
194         xq_append_head(&peer->free_reqs, idx, 1);
195 }
196
197 struct timeval resp_start, resp_end, resp_accum = {0, 0};
198 uint64_t responds = 0;
199 void get_responds_stats(){
200                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
201                                 //(unsigned int)(t - peer->thread),
202                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
203 }
204
205 //FIXME error check
206 void fail(struct peerd *peer, struct peer_req *pr)
207 {
208         struct xseg_request *req = pr->req;
209         uint32_t p;
210         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
211         req->state |= XS_FAILED;
212         //xseg_set_req_data(peer->xseg, pr->req, NULL);
213         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
214         xseg_signal(peer->xseg, p);
215         free_peer_req(peer, pr);
216 #ifdef MT
217         wake_up_next_thread(peer);
218 #endif
219 }
220
221 //FIXME error check
222 void complete(struct peerd *peer, struct peer_req *pr)
223 {
224         struct xseg_request *req = pr->req;
225         uint32_t p;
226         req->state |= XS_SERVED;
227         //xseg_set_req_data(peer->xseg, pr->req, NULL);
228         //gettimeofday(&resp_start, NULL);
229         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
230         //gettimeofday(&resp_end, NULL);
231         //responds++;
232         //timersub(&resp_end, &resp_start, &resp_end);
233         //timeradd(&resp_end, &resp_accum, &resp_accum);
234         //printf("xseg_signal: %u\n", p);
235         xseg_signal(peer->xseg, p);
236         free_peer_req(peer, pr);
237 #ifdef MT
238         wake_up_next_thread(peer);
239 #endif
240 }
241
242 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
243                                 struct xseg_request *req)
244 {
245         struct xseg_request *xreq = pr->req;
246         //assert xreq == req;
247         XSEGLOG2(&lc, D, "Handle accepted");
248         xreq->serviced = 0;
249         //xreq->state = XS_ACCEPTED;
250         pr->retval = 0;
251         dispatch(peer, pr, req, dispatch_accept);
252 }
253
254 static void handle_received(struct peerd *peer, struct peer_req *pr,
255                                 struct xseg_request *req)
256 {
257         //struct xseg_request *req = pr->req;
258         //assert req->state != XS_ACCEPTED;
259         XSEGLOG2(&lc, D, "Handle received \n");
260         dispatch(peer, pr, req, dispatch_receive);
261
262 }
263 struct timeval sub_start, sub_end, sub_accum = {0, 0};
264 uint64_t submits = 0;
265 void get_submits_stats(){
266                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
267                                 //(unsigned int)(t - peer->thread),
268                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
269 }
270
271 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
272 {
273         uint32_t ret;
274         struct xseg_request *req = pr->req;
275         // assert req->portno == peer->portno ?
276         //TODO small function with error checking
277         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
278         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
279         if (ret < 0)
280                 return -1;
281         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
282         //gettimeofday(&sub_start, NULL);
283         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
284         //gettimeofday(&sub_end, NULL);
285         //submits++;
286         //timersub(&sub_end, &sub_start, &sub_end);
287         //timeradd(&sub_end, &sub_accum, &sub_accum);
288         if (ret == NoPort)
289                 return -1;
290         xseg_signal(peer->xseg, ret);
291         return 0;
292 }
293
294 static int check_ports(struct peerd *peer)
295 {
296         struct xseg *xseg = peer->xseg;
297         xport portno_start = peer->portno_start;
298         xport portno_end = peer->portno_end;
299         struct xseg_request *accepted, *received;
300         struct peer_req *pr;
301         xport i;
302         int  r, c = 0;
303
304         for (i = portno_start; i <= portno_end; i++) {
305                 accepted = NULL;
306                 received = NULL;
307                 if (!isTerminate()) {
308                         pr = alloc_peer_req(peer);
309                         if (pr) {
310                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
311                                 if (accepted) {
312                                         pr->req = accepted;
313                                         pr->portno = i;
314                                         xseg_cancel_wait(xseg, i);
315                                         handle_accepted(peer, pr, accepted);
316                                         c = 1;
317                                 }
318                                 else {
319                                         free_peer_req(peer, pr);
320                                 }
321                         }
322                 }
323                 received = xseg_receive(xseg, i, X_NONBLOCK);
324                 if (received) {
325                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
326                         if (r < 0 || !pr){
327                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
328                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
329                                 if (p == NoPort){
330                                         XSEGLOG2(&lc, W, "Could not respond stale request");
331                                         xseg_put_request(xseg, received, portno_start);
332                                         continue;
333                                 } else {
334                                         xseg_signal(xseg, p);
335                                 }
336                         } else {
337                                 //maybe perform sanity check for pr
338                                 xseg_cancel_wait(xseg, i);
339                                 handle_received(peer, pr, received);
340                                 c = 1;
341                         }
342                 }
343         }
344
345         return c;
346 }
347
348 #ifdef MT
349 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
350 {
351         struct thread *t = alloc_thread(peer);
352         if (t) {
353                 t->func = func;
354                 t->arg = arg;
355                 wake_up_thread(t);
356                 return 0;
357         } else
358                 // we could hijack a thread
359                 return -1;
360 }
361
362 static void* thread_loop(void *arg)
363 {
364         struct thread *t = (struct thread *) arg;
365         struct peerd *peer = t->peer;
366         struct xseg *xseg = peer->xseg;
367         xport portno_start = peer->portno_start;
368         xport portno_end = peer->portno_end;
369         pid_t pid =syscall(SYS_gettid);
370         uint64_t loops;
371         uint64_t threshold=1000/(1 + portno_end - portno_start);
372         
373         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
374
375         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
376         xseg_init_local_signal(xseg, peer->portno_start);
377         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
378                 if (t->func) {
379                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
380                         xseg_cancel_wait(xseg, peer->portno_start);
381                         t->func(t->arg);
382                         t->func = NULL;
383                         t->arg = NULL;
384                         continue;
385                 }
386
387                 for(loops= threshold; loops > 0; loops--) {
388                         if (loops == 1)
389                                 xseg_prepare_wait(xseg, peer->portno_start);
390                         if (check_ports(peer))
391                                 loops = threshold;
392                 }
393                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
394                 xseg_wait_signal(xseg, 10000000UL);
395                 xseg_cancel_wait(xseg, peer->portno_start);
396                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
397         }
398         return NULL;
399 }
400
401 int peerd_start_threads(struct peerd *peer)
402 {
403         int i;
404         uint32_t nr_threads = peer->nr_threads;
405         //TODO err check
406         for (i = 0; i < nr_threads; i++) {
407                 peer->thread[i].peer = peer;
408                 pthread_cond_init(&peer->thread[i].cond,NULL);
409                 pthread_mutex_init(&peer->thread[i].lock, NULL);
410                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
411                 peer->thread[i].func = NULL;
412                 peer->thread[i].arg = NULL;
413
414         }
415         return 0;
416 }
417 #endif
418
419 void defer_request(struct peerd *peer, struct peer_req *pr)
420 {
421         // assert canDefer(peer);
422 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
423 //      xseg_signal(peer->xseg, peer->defer_portno);
424 //      free_peer_req(peer, pr);
425 }
426
427 static int peerd_loop(struct peerd *peer) 
428 {
429 #ifdef MT
430         int i;
431         if (peer->interactive_func)
432                 peer->interactive_func();
433         for (i = 0; i < peer->nr_threads; i++) {
434                 pthread_join(peer->thread[i].tid, NULL);
435         }
436 #else
437         struct xseg *xseg = peer->xseg;
438         xport portno_start = peer->portno_start;
439         xport portno_end = peer->portno_end;
440         uint64_t threshold=1000/(1 + portno_end - portno_start);
441         pid_t pid =syscall(SYS_gettid);
442         uint64_t loops;
443         
444         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
445         xseg_init_local_signal(xseg, peer->portno_start);
446         for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
447                 for(loops= threshold; loops > 0; loops--) {
448                         if (loops == 1)
449                                 xseg_prepare_wait(xseg, peer->portno_start);
450                         if (check_ports(peer))
451                                 loops = threshold;
452                 }
453 #ifdef ST_THREADS
454                 if (ta){
455                         st_sleep(0);
456                 } else {
457 #endif
458                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
459                         xseg_wait_signal(xseg, 10000000UL);
460                         xseg_cancel_wait(xseg, peer->portno_start);
461                         XSEGLOG2(&lc, I, "Peer woke up\n");
462 #ifdef ST_THREADS
463                 }
464 #endif
465         }
466         xseg_quit_local_signal(xseg, peer->portno_start);
467 #endif
468         return 0;
469 }
470
471 static struct xseg *join(char *spec)
472 {
473         struct xseg_config config;
474         struct xseg *xseg;
475
476         (void)xseg_parse_spec(spec, &config);
477         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
478         if (xseg)
479                 return xseg;
480
481         (void)xseg_create(&config);
482         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
483 }
484
485 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
486                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
487 {
488         int i;
489         struct peerd *peer;
490         struct xseg_port *port;
491
492 #ifdef ST_THREADS
493         st_init();
494 #endif
495         peer = malloc(sizeof(struct peerd));
496         if (!peer) {
497                 perror("malloc");
498                 return NULL;
499         }
500         peer->nr_ops = nr_ops;
501         peer->defer_portno = defer_portno;
502 #ifdef MT
503         peer->nr_threads = nr_threads;
504         peer->thread = calloc(nr_threads, sizeof(struct thread));
505         if (!peer->thread)
506                 goto malloc_fail;
507 #endif
508         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
509         if (!peer->peer_reqs){
510 malloc_fail:
511                 perror("malloc");
512                 return NULL;
513         }
514
515         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
516                 goto malloc_fail;
517 #ifdef MT
518         if (!xq_alloc_empty(&peer->threads, nr_threads))
519                 goto malloc_fail;
520 #endif
521         if (xseg_initialize()){
522                 printf("cannot initialize library\n");
523                 return NULL;
524         }
525         peer->xseg = join(spec);
526         if (!peer->xseg) 
527                 return NULL;
528
529         peer->portno_start = (xport) portno_start;
530         peer->portno_end= (xport) portno_end;
531         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
532         if (!port){
533                 printf("cannot bind to port %ld\n", peer->portno_start);
534                 return NULL;
535         }
536
537         xport p;
538         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
539                 struct xseg_port *tmp;
540                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
541                 if (!tmp){
542                         printf("cannot bind to port %ld\n", p);
543                         return NULL;
544                 }
545         }
546
547         printf("Peer on ports  %u-%u\n", peer->portno_start,
548                         peer->portno_end);
549
550         for (i = 0; i < nr_ops; i++) {
551                 peer->peer_reqs[i].peer = peer;
552                 peer->peer_reqs[i].req = NULL;
553                 peer->peer_reqs[i].retval = 0;
554                 peer->peer_reqs[i].priv = NULL;
555                 peer->peer_reqs[i].portno = NoPort;
556 #ifdef ST_THREADS
557                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
558 #endif
559         }
560 #ifdef MT
561         peer->interactive_func = NULL;
562 #endif
563         return peer;
564 }
565
566
567 int main(int argc, char *argv[])
568 {
569         struct peerd *peer = NULL;
570         //parse args
571         char *spec = "";
572         int i, r;
573         long portno_start = -1, portno_end = -1, portno = -1;
574         //set defaults here
575         uint32_t nr_ops = 16;
576         uint32_t nr_threads = 16 ;
577         unsigned int debug_level = 0;
578         uint32_t defer_portno = NoPort;
579         char *logfile = NULL;
580
581         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
582         // -dp xseg_portno to defer blocking requests
583         // -l log file ?
584         //TODO print messages on arg parsing error
585         
586         for (i = 1; i < argc; i++) {
587                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
588                         spec = argv[i+1];
589                         i += 1;
590                         continue;
591                 }
592
593                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
594                         portno_start = strtoul(argv[i+1], NULL, 10);
595                         i += 1;
596                         continue;
597                 }
598                 
599                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
600                         portno_end = strtoul(argv[i+1], NULL, 10);
601                         i += 1;
602                         continue;
603                 }
604
605                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
606                         portno = strtoul(argv[i+1], NULL, 10);
607                         i += 1;
608                         continue;
609                 }
610
611                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
612                         nr_ops = strtoul(argv[i+1], NULL, 10);
613                         i += 1;
614                         continue;
615                 }
616                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
617                         debug_level = atoi(argv[i+1]);
618                         i += 1;
619                         continue;
620                 }
621                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
622                         nr_threads = strtoul(argv[i+1], NULL, 10);
623                         i += 1;
624                         continue;
625                 }
626                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
627                         defer_portno = strtoul(argv[i+1], NULL, 10);
628                         i += 1;
629                         continue;
630                 }
631                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
632                         logfile = argv[i+1];
633                         i += 1;
634                         continue;
635                 }
636
637         }
638         init_logctx(&lc, argv[0], debug_level, logfile);
639         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
640         
641         //TODO perform argument sanity checks
642         verbose = debug_level;
643         if (portno != -1) {
644                 portno_start = portno;
645                 portno_end = portno;
646         }
647
648         setup_signals();
649         //TODO err check
650         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
651         if (!peer)
652                 return -1;
653         r = custom_peer_init(peer, argc, argv);
654         if (r < 0)
655                 return -1;
656 #ifdef MT
657         peerd_start_threads(peer);
658 #endif
659
660 #ifdef ST_THREADS
661         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
662         return st_thread_join(st, NULL);
663 #else
664         return peerd_loop(peer);
665 #endif
666 }