merge peers skeletons
[archipelago] / xseg / peers / user / peer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <peer.h>
8 #include <sys/syscall.h>
9 #include <sys/time.h>
10 #include <signal.h>
11 #ifdef MT
12 #include <pthread.h>
13 #endif
14
15 #ifdef MT
16 #define PEER_TYPE "pthread"
17 #else
18 #define PEER_TYPE "posix"
19 #endif
20
21 unsigned int verbose = 0;
22 struct log_ctx lc;
23 #ifdef ST_THREADS
24 uint32_t ta = 0;
25 #endif
26
27 #ifdef MT
28 struct thread {
29         struct peerd *peer;
30         pthread_t tid;
31         pthread_cond_t cond;
32         pthread_mutex_t lock;
33         void (*func)(void *arg);
34         void *arg;
35 };
36
37
38 inline static struct thread* alloc_thread(struct peerd *peer)
39 {
40         xqindex idx = xq_pop_head(&peer->threads, 1);
41         if (idx == Noneidx)
42                 return NULL;
43         return peer->thread + idx;
44 }
45
46 inline static void free_thread(struct peerd *peer, struct thread *t)
47 {
48         xqindex idx = t - peer->thread;
49         xq_append_head(&peer->threads, idx, 1);
50 }
51
52
53 inline static void __wake_up_thread(struct thread *t)
54 {
55         pthread_mutex_lock(&t->lock);
56         pthread_cond_signal(&t->cond);
57         pthread_mutex_unlock(&t->lock);
58 }
59
60 inline static void wake_up_thread(struct thread* t)
61 {
62         if (t){
63                 __wake_up_thread(t);
64         }
65 }
66
67 inline static int wake_up_next_thread(struct peerd *peer)
68 {
69         return (xseg_signal(peer->xseg, peer->portno_start));
70 }
71 #endif
72
73 inline int canDefer(struct peerd *peer)
74 {
75         return !(peer->defer_portno == NoPort);
76 }
77
78 void print_req(struct xseg *xseg, struct xseg_request *req)
79 {
80         char target[64], data[64];
81         char *req_target, *req_data;
82         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
83         req_target = xseg_get_target(xseg, req);
84         req_data = xseg_get_data(xseg, req);
85
86         if (1) {
87                 strncpy(target, req_target, end);
88                 target[end] = 0;
89                 strncpy(data, req_data, 63);
90                 data[63] = 0;
91                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
92                                 "src: %u, st: %u, dst: %u dt: %u\n"
93                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
94                                 (unsigned long)(req),
95                                 (unsigned int)req->op,
96                                 (unsigned long long)req->offset,
97                                 (unsigned long)req->size,
98                                 (unsigned long)req->serviced,
99                                 (unsigned int)req->state,
100                                 (unsigned int)req->src_portno,
101                                 (unsigned int)req->src_transit_portno,
102                                 (unsigned int)req->dst_portno,
103                                 (unsigned int)req->dst_transit_portno,
104                                 (unsigned int)req->targetlen, target,
105                                 (unsigned long long)req->datalen, data);
106         }
107 }
108 void log_pr(char *msg, struct peer_req *pr)
109 {
110         char target[64], data[64];
111         char *req_target, *req_data;
112         struct peerd *peer = pr->peer;
113         struct xseg *xseg = pr->peer->xseg;
114         req_target = xseg_get_target(xseg, pr->req);
115         req_data = xseg_get_data(xseg, pr->req);
116         /* null terminate name in case of req->target is less than 63 characters,
117          * and next character after name (aka first byte of next buffer) is not
118          * null
119          */
120         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
121         if (verbose) {
122                 strncpy(target, req_target, end);
123                 target[end] = 0;
124                 strncpy(data, req_data, 63);
125                 data[63] = 0;
126                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
127                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
128                                 msg,
129                                 (unsigned int)(pr - peer->peer_reqs),
130                                 (unsigned int)pr->req->op,
131                                 (unsigned long long)pr->req->offset,
132                                 (unsigned long)pr->req->size,
133                                 (unsigned long)pr->req->serviced,
134                                 (unsigned long)pr->retval,
135                                 (unsigned int)pr->req->state,
136                                 (unsigned int)pr->req->targetlen, target,
137                                 (unsigned long long)pr->req->datalen, data);
138         }
139 }
140
141 inline struct peer_req *alloc_peer_req(struct peerd *peer)
142 {
143         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
144         if (idx == Noneidx)
145                 return NULL;
146         return peer->peer_reqs + idx;
147 }
148
149 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
150 {
151         xqindex idx = pr - peer->peer_reqs;
152         pr->req = NULL;
153         xq_append_head(&peer->free_reqs, idx, 1);
154 }
155
156 struct timeval resp_start, resp_end, resp_accum = {0, 0};
157 uint64_t responds = 0;
158 void get_responds_stats(){
159                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
160                                 //(unsigned int)(t - peer->thread),
161                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
162 }
163
164 //FIXME error check
165 void fail(struct peerd *peer, struct peer_req *pr)
166 {
167         struct xseg_request *req = pr->req;
168         uint32_t p;
169         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
170         req->state |= XS_FAILED;
171         //xseg_set_req_data(peer->xseg, pr->req, NULL);
172         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
173         xseg_signal(peer->xseg, p);
174         free_peer_req(peer, pr);
175 #ifdef MT
176         wake_up_next_thread(peer);
177 #endif
178 }
179
180 //FIXME error check
181 void complete(struct peerd *peer, struct peer_req *pr)
182 {
183         struct xseg_request *req = pr->req;
184         uint32_t p;
185         req->state |= XS_SERVED;
186         //xseg_set_req_data(peer->xseg, pr->req, NULL);
187         //gettimeofday(&resp_start, NULL);
188         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
189         //gettimeofday(&resp_end, NULL);
190         //responds++;
191         //timersub(&resp_end, &resp_start, &resp_end);
192         //timeradd(&resp_end, &resp_accum, &resp_accum);
193         //printf("xseg_signal: %u\n", p);
194         xseg_signal(peer->xseg, p);
195         free_peer_req(peer, pr);
196 #ifdef MT
197         wake_up_next_thread(peer);
198 #endif
199 }
200
201 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
202                                 struct xseg_request *req)
203 {
204         struct xseg_request *xreq = pr->req;
205         //assert xreq == req;
206         XSEGLOG2(&lc, D, "Handle accepted");
207         xreq->serviced = 0;
208         //xreq->state = XS_ACCEPTED;
209         pr->retval = 0;
210         dispatch(peer, pr, req, dispatch_accept);
211 }
212
213 static void handle_received(struct peerd *peer, struct peer_req *pr,
214                                 struct xseg_request *req)
215 {
216         //struct xseg_request *req = pr->req;
217         //assert req->state != XS_ACCEPTED;
218         XSEGLOG2(&lc, D, "Handle received \n");
219         dispatch(peer, pr, req, dispatch_receive);
220
221 }
222 struct timeval sub_start, sub_end, sub_accum = {0, 0};
223 uint64_t submits = 0;
224 void get_submits_stats(){
225                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
226                                 //(unsigned int)(t - peer->thread),
227                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
228 }
229
230 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
231 {
232         uint32_t ret;
233         struct xseg_request *req = pr->req;
234         // assert req->portno == peer->portno ?
235         //TODO small function with error checking
236         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
237         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
238         if (ret < 0)
239                 return -1;
240         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
241         //gettimeofday(&sub_start, NULL);
242         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
243         //gettimeofday(&sub_end, NULL);
244         //submits++;
245         //timersub(&sub_end, &sub_start, &sub_end);
246         //timeradd(&sub_end, &sub_accum, &sub_accum);
247         if (ret == NoPort)
248                 return -1;
249         xseg_signal(peer->xseg, ret);
250         return 0;
251 }
252
253 static int check_ports(struct peerd *peer)
254 {
255         struct xseg *xseg = peer->xseg;
256         xport portno_start = peer->portno_start;
257         xport portno_end = peer->portno_end;
258         struct xseg_request *accepted, *received;
259         struct peer_req *pr;
260         xport i;
261         int  r, c = 0;
262
263         for (i = portno_start; i <= portno_end; i++) {
264                 accepted = NULL;
265                 received = NULL;
266                 pr = alloc_peer_req(peer);
267                 if (pr) {
268                         accepted = xseg_accept(xseg, i, X_NONBLOCK);
269                         if (accepted) {
270                                 pr->req = accepted;
271                                 pr->portno = i;
272                                 xseg_cancel_wait(xseg, i);
273                                 handle_accepted(peer, pr, accepted);
274                                 c = 1;
275                         }
276                         else {
277                                 free_peer_req(peer, pr);
278                         }
279                 }
280                 received = xseg_receive(xseg, i, X_NONBLOCK);
281                 if (received) {
282                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
283                         if (r < 0 || !pr){
284                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
285                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
286                                 if (p == NoPort){
287                                         XSEGLOG2(&lc, W, "Could not respond stale request");
288                                         xseg_put_request(xseg, received, portno_start);
289                                         continue;
290                                 } else {
291                                         xseg_signal(xseg, p);
292                                 }
293                         } else {
294                                 //maybe perform sanity check for pr
295                                 xseg_cancel_wait(xseg, i);
296                                 handle_received(peer, pr, received);
297                                 c = 1;
298                         }
299                 }
300         }
301
302         return c;
303 }
304
305 #ifdef MT
306 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
307 {
308         struct thread *t = alloc_thread(peer);
309         if (t) {
310                 t->func = func;
311                 t->arg = arg;
312                 wake_up_thread(t);
313                 return 0;
314         } else
315                 // we could hijack a thread
316                 return -1;
317 }
318
319 static void* thread_loop(void *arg)
320 {
321         struct thread *t = (struct thread *) arg;
322         struct peerd *peer = t->peer;
323         struct xseg *xseg = peer->xseg;
324         xport portno_start = peer->portno_start;
325         xport portno_end = peer->portno_end;
326         pid_t pid =syscall(SYS_gettid);
327         uint64_t loops;
328         uint64_t threshold=1000/(1 + portno_end - portno_start);
329         
330         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
331
332         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
333         xseg_init_local_signal(xseg, peer->portno_start);
334         for (;;) {
335                 if (t->func) {
336                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
337                         xseg_cancel_wait(xseg, peer->portno_start);
338                         t->func(t->arg);
339                         t->func = NULL;
340                         t->arg = NULL;
341                         continue;
342                 }
343
344                 for(loops= threshold; loops > 0; loops--) {
345                         if (loops == 1)
346                                 xseg_prepare_wait(xseg, peer->portno_start);
347                         if (check_ports(peer))
348                                 loops = threshold;
349                 }
350                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
351                 xseg_wait_signal(xseg, 10000000UL);
352                 xseg_cancel_wait(xseg, peer->portno_start);
353                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
354         }
355         return NULL;
356 }
357
358 int peerd_start_threads(struct peerd *peer)
359 {
360         int i;
361         uint32_t nr_threads = peer->nr_threads;
362         //TODO err check
363         for (i = 0; i < nr_threads; i++) {
364                 peer->thread[i].peer = peer;
365                 pthread_cond_init(&peer->thread[i].cond,NULL);
366                 pthread_mutex_init(&peer->thread[i].lock, NULL);
367                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
368                 peer->thread[i].func = NULL;
369                 peer->thread[i].arg = NULL;
370
371         }
372         return 0;
373 }
374 #endif
375
376 void defer_request(struct peerd *peer, struct peer_req *pr)
377 {
378         // assert canDefer(peer);
379 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
380 //      xseg_signal(peer->xseg, peer->defer_portno);
381 //      free_peer_req(peer, pr);
382 }
383
384 static int peerd_loop(struct peerd *peer) 
385 {
386 #ifdef MT
387         if (peer->interactive_func)
388                 peer->interactive_func();
389         for (;;) {
390                 pthread_join(peer->thread[0].tid, NULL);
391         }
392 #else
393         struct xseg *xseg = peer->xseg;
394         xport portno_start = peer->portno_start;
395         xport portno_end = peer->portno_end;
396         uint64_t threshold=1000/(1 + portno_end - portno_start);
397         pid_t pid =syscall(SYS_gettid);
398         uint64_t loops;
399         
400         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
401         xseg_init_local_signal(xseg, peer->portno_start);
402         for (;;) {
403                 for(loops= threshold; loops > 0; loops--) {
404                         if (loops == 1)
405                                 xseg_prepare_wait(xseg, peer->portno_start);
406                         if (check_ports(peer))
407                                 loops = threshold;
408                 }
409 #ifdef ST_THREADS
410                 if (ta){
411                         st_sleep(0);
412                 } else {
413 #endif
414                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
415                         xseg_wait_signal(xseg, 10000000UL);
416                         xseg_cancel_wait(xseg, peer->portno_start);
417                         XSEGLOG2(&lc, I, "Peer woke up\n");
418 #ifdef ST_THREADS
419                 }
420 #endif
421         }
422         xseg_quit_local_signal(xseg, peer->portno_start);
423 #endif
424         return 0;
425 }
426
427 static struct xseg *join(char *spec)
428 {
429         struct xseg_config config;
430         struct xseg *xseg;
431
432         (void)xseg_parse_spec(spec, &config);
433         xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
434         if (xseg)
435                 return xseg;
436
437         (void)xseg_create(&config);
438         return xseg_join(config.type, config.name, PEER_TYPE, NULL);
439 }
440
441 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
442                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
443 {
444         int i;
445         struct peerd *peer;
446         struct xseg_port *port;
447
448 #ifdef ST_THREADS
449         st_init();
450 #endif
451         peer = malloc(sizeof(struct peerd));
452         if (!peer) {
453                 perror("malloc");
454                 return NULL;
455         }
456         peer->nr_ops = nr_ops;
457         peer->defer_portno = defer_portno;
458 #ifdef MT
459         peer->nr_threads = nr_threads;
460         peer->thread = calloc(nr_threads, sizeof(struct thread));
461         if (!peer->thread)
462                 goto malloc_fail;
463 #endif
464         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
465         if (!peer->peer_reqs){
466 malloc_fail:
467                 perror("malloc");
468                 return NULL;
469         }
470
471         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
472                 goto malloc_fail;
473 #ifdef MT
474         if (!xq_alloc_empty(&peer->threads, nr_threads))
475                 goto malloc_fail;
476 #endif
477         if (xseg_initialize()){
478                 printf("cannot initialize library\n");
479                 return NULL;
480         }
481         peer->xseg = join(spec);
482         if (!peer->xseg) 
483                 return NULL;
484
485         peer->portno_start = (xport) portno_start;
486         peer->portno_end= (xport) portno_end;
487         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
488         if (!port){
489                 printf("cannot bind to port %ld\n", peer->portno_start);
490                 return NULL;
491         }
492
493         xport p;
494         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
495                 struct xseg_port *tmp;
496                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
497                 if (!tmp){
498                         printf("cannot bind to port %ld\n", p);
499                         return NULL;
500                 }
501         }
502
503         printf("Peer on ports  %u-%u\n", peer->portno_start,
504                         peer->portno_end);
505
506         for (i = 0; i < nr_ops; i++) {
507                 peer->peer_reqs[i].peer = peer;
508                 peer->peer_reqs[i].req = NULL;
509                 peer->peer_reqs[i].retval = 0;
510                 peer->peer_reqs[i].priv = NULL;
511                 peer->peer_reqs[i].portno = NoPort;
512 #ifdef ST_THREADS
513                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
514 #endif
515         }
516 #ifdef MT
517         peer->interactive_func = NULL;
518 #endif
519         return peer;
520 }
521
522
523 int main(int argc, char *argv[])
524 {
525         struct peerd *peer = NULL;
526         //parse args
527         char *spec = "";
528         int i, r;
529         long portno_start = -1, portno_end = -1, portno = -1;
530         //set defaults here
531         uint32_t nr_ops = 16;
532         uint32_t nr_threads = 16 ;
533         unsigned int debug_level = 0;
534         uint32_t defer_portno = NoPort;
535         char *logfile = NULL;
536
537         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
538         // -dp xseg_portno to defer blocking requests
539         // -l log file ?
540         //TODO print messages on arg parsing error
541         
542         for (i = 1; i < argc; i++) {
543                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
544                         spec = argv[i+1];
545                         i += 1;
546                         continue;
547                 }
548
549                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
550                         portno_start = strtoul(argv[i+1], NULL, 10);
551                         i += 1;
552                         continue;
553                 }
554                 
555                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
556                         portno_end = strtoul(argv[i+1], NULL, 10);
557                         i += 1;
558                         continue;
559                 }
560
561                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
562                         portno = strtoul(argv[i+1], NULL, 10);
563                         i += 1;
564                         continue;
565                 }
566
567                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
568                         nr_ops = strtoul(argv[i+1], NULL, 10);
569                         i += 1;
570                         continue;
571                 }
572                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
573                         debug_level = atoi(argv[i+1]);
574                         i += 1;
575                         continue;
576                 }
577                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
578                         nr_threads = strtoul(argv[i+1], NULL, 10);
579                         i += 1;
580                         continue;
581                 }
582                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
583                         defer_portno = strtoul(argv[i+1], NULL, 10);
584                         i += 1;
585                         continue;
586                 }
587                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
588                         logfile = argv[i+1];
589                         i += 1;
590                         continue;
591                 }
592
593         }
594         init_logctx(&lc, argv[0], debug_level, logfile);
595         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
596         
597         //TODO perform argument sanity checks
598         verbose = debug_level;
599         if (portno != -1) {
600                 portno_start = portno;
601                 portno_end = portno;
602         }
603
604         //TODO err check
605         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
606         if (!peer)
607                 return -1;
608         r = custom_peer_init(peer, argc, argv);
609         if (r < 0)
610                 return -1;
611 #ifdef MT
612         peerd_start_threads(peer);
613 #endif
614
615 #ifdef ST_THREADS
616         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
617         return st_thread_join(st, NULL);
618 #else
619         return peerd_loop(peer);
620 #endif
621 }