rewrite mt-mapperd based on st threads
[archipelago] / xseg / peers / user / mpeer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <pthread.h>
8 #include <mpeer.h>
9 #include <sys/syscall.h>
10 #include <sys/time.h>
11 #include <signal.h>
12
13 unsigned int verbose = 0;
14 struct log_ctx lc;
15
16 struct thread {
17         struct peerd *peer;
18         pthread_t tid;
19         pthread_cond_t cond;
20         pthread_mutex_t lock;
21         void (*func)(void *arg);
22         void *arg;
23 };
24
25
26 inline int canDefer(struct peerd *peer)
27 {
28         return !(peer->defer_portno == NoPort);
29 }
30
31 void print_req(struct xseg *xseg, struct xseg_request *req)
32 {
33         char target[64], data[64];
34         char *req_target, *req_data;
35         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
36         req_target = xseg_get_target(xseg, req);
37         req_data = xseg_get_data(xseg, req);
38
39         if (1) {
40                 strncpy(target, req_target, end);
41                 target[end] = 0;
42                 strncpy(data, req_data, 63);
43                 data[63] = 0;
44                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
45                                 "src: %u, st: %u, dst: %u dt: %u\n"
46                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
47                                 (unsigned long)(req),
48                                 (unsigned int)req->op,
49                                 (unsigned long long)req->offset,
50                                 (unsigned long)req->size,
51                                 (unsigned long)req->serviced,
52                                 (unsigned int)req->state,
53                                 (unsigned int)req->src_portno,
54                                 (unsigned int)req->src_transit_portno,
55                                 (unsigned int)req->dst_portno,
56                                 (unsigned int)req->dst_transit_portno,
57                                 (unsigned int)req->targetlen, target,
58                                 (unsigned long long)req->datalen, data);
59         }
60 }
61 void log_pr(char *msg, struct peer_req *pr)
62 {
63         char target[64], data[64];
64         char *req_target, *req_data;
65         struct peerd *peer = pr->peer;
66         struct xseg *xseg = pr->peer->xseg;
67         req_target = xseg_get_target(xseg, pr->req);
68         req_data = xseg_get_data(xseg, pr->req);
69         /* null terminate name in case of req->target is less than 63 characters,
70          * and next character after name (aka first byte of next buffer) is not
71          * null
72          */
73         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
74         if (verbose) {
75                 strncpy(target, req_target, end);
76                 target[end] = 0;
77                 strncpy(data, req_data, 63);
78                 data[63] = 0;
79                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
80                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
81                                 msg,
82                                 (unsigned int)(pr - peer->peer_reqs),
83                                 (unsigned int)pr->req->op,
84                                 (unsigned long long)pr->req->offset,
85                                 (unsigned long)pr->req->size,
86                                 (unsigned long)pr->req->serviced,
87                                 (unsigned long)pr->retval,
88                                 (unsigned int)pr->req->state,
89                                 (unsigned int)pr->req->targetlen, target,
90                                 (unsigned long long)pr->req->datalen, data);
91         }
92 }
93
94 inline struct peer_req *alloc_peer_req(struct peerd *peer)
95 {
96         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
97         if (idx == Noneidx)
98                 return NULL;
99         return peer->peer_reqs + idx;
100 }
101
102 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
103 {
104         xqindex idx = pr - peer->peer_reqs;
105         pr->req = NULL;
106         xq_append_head(&peer->free_reqs, idx, 1);
107 }
108
109 inline static struct thread* alloc_thread(struct peerd *peer)
110 {
111         xqindex idx = xq_pop_head(&peer->threads, 1);
112         if (idx == Noneidx)
113                 return NULL;
114         return peer->thread + idx;
115 }
116
117 inline static void free_thread(struct peerd *peer, struct thread *t)
118 {
119         xqindex idx = t - peer->thread;
120         xq_append_head(&peer->threads, idx, 1);
121 }
122
123
124 inline static void __wake_up_thread(struct thread *t)
125 {
126         pthread_mutex_lock(&t->lock);
127         pthread_cond_signal(&t->cond);
128         pthread_mutex_unlock(&t->lock);
129 }
130
131 inline static void wake_up_thread(struct thread* t)
132 {
133         if (t){
134                 __wake_up_thread(t);
135         }
136 }
137
138 inline static int wake_up_next_thread(struct peerd *peer)
139 {
140         //struct thread *t = alloc_thread(peer);
141         //wake_up_thread(t);
142         //return t;
143         return (xseg_signal(peer->xseg, peer->portno_start));
144 }
145
146 struct timeval resp_start, resp_end, resp_accum = {0, 0};
147 uint64_t responds = 0;
148 void get_responds_stats(){
149                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
150                                 //(unsigned int)(t - peer->thread),
151                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
152 }
153
154 //FIXME error check
155 void fail(struct peerd *peer, struct peer_req *pr)
156 {
157         struct xseg_request *req = pr->req;
158         uint32_t p;
159         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
160         req->state |= XS_FAILED;
161         //xseg_set_req_data(peer->xseg, pr->req, NULL);
162         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
163         xseg_signal(peer->xseg, p);
164         free_peer_req(peer, pr);
165         wake_up_next_thread(peer);
166 }
167
168 //FIXME error check
169 void complete(struct peerd *peer, struct peer_req *pr)
170 {
171         struct xseg_request *req = pr->req;
172         uint32_t p;
173         req->state |= XS_SERVED;
174         //xseg_set_req_data(peer->xseg, pr->req, NULL);
175         //gettimeofday(&resp_start, NULL);
176         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
177         //gettimeofday(&resp_end, NULL);
178         //responds++;
179         //timersub(&resp_end, &resp_start, &resp_end);
180         //timeradd(&resp_end, &resp_accum, &resp_accum);
181         //printf("xseg_signal: %u\n", p);
182         xseg_signal(peer->xseg, p);
183         free_peer_req(peer, pr);
184         wake_up_next_thread(peer);
185 }
186
187 void pending(struct peerd *peer, struct peer_req *pr)
188 {
189                 pr->req->state = XS_PENDING;
190 }
191
192 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
193                                 struct xseg_request *req)
194 {
195         struct xseg_request *xreq = pr->req;
196         //assert xreq == req;
197         XSEGLOG2(&lc, D, "Handle accepted");
198         xreq->serviced = 0;
199         //xreq->state = XS_ACCEPTED;
200         pr->retval = 0;
201         dispatch(peer, pr, req, dispatch_accept);
202 }
203
204 static void handle_received(struct peerd *peer, struct peer_req *pr,
205                                 struct xseg_request *req)
206 {
207         //struct xseg_request *req = pr->req;
208         //assert req->state != XS_ACCEPTED;
209         XSEGLOG2(&lc, D, "Handle received \n");
210         dispatch(peer, pr, req, dispatch_receive);
211
212 }
213 struct timeval sub_start, sub_end, sub_accum = {0, 0};
214 uint64_t submits = 0;
215 void get_submits_stats(){
216                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
217                                 //(unsigned int)(t - peer->thread),
218                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
219 }
220
221 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
222 {
223         uint32_t ret;
224         struct xseg_request *req = pr->req;
225         // assert req->portno == peer->portno ?
226         //TODO small function with error checking
227         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
228         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
229         if (ret < 0)
230                 return -1;
231         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
232         //gettimeofday(&sub_start, NULL);
233         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
234         //gettimeofday(&sub_end, NULL);
235         //submits++;
236         //timersub(&sub_end, &sub_start, &sub_end);
237         //timeradd(&sub_end, &sub_accum, &sub_accum);
238         if (ret == NoPort)
239                 return -1;
240         xseg_signal(peer->xseg, ret);
241         return 0;
242 }
243
244 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
245 {
246         struct thread *t = alloc_thread(peer);
247         if (t) {
248                 t->func = func;
249                 t->arg = arg;
250                 wake_up_thread(t);
251                 return 0;
252         } else
253                 // we could hijack a thread
254                 return -1;
255 }
256
257 static void* thread_loop(void *arg)
258 {
259         struct thread *t = (struct thread *) arg;
260         struct peerd *peer = t->peer;
261         struct xseg *xseg = peer->xseg;
262         xport portno_start = peer->portno_start;
263         xport portno_end = peer->portno_end;
264         struct peer_req *pr;
265         uint64_t threshold=1;
266         pid_t pid =syscall(SYS_gettid);
267         uint64_t loops;
268         struct xseg_request *accepted, *received;
269         int r;
270         int change;
271         xport i;
272                 
273         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
274
275         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
276         xseg_init_local_signal(xseg, peer->portno_start);
277         for (;;) {
278                 if (t->func) {
279                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
280                         xseg_cancel_wait(xseg, peer->portno_start);
281                         t->func(t->arg);
282                         t->func = NULL;
283                         t->arg = NULL;
284                         continue;
285                 }
286
287                 for(loops= threshold; loops > 0; loops--) {
288                         do {
289                                 if (loops == 1)
290                                         xseg_prepare_wait(xseg, peer->portno_start);
291                                 change = 0;
292                                 for (i = portno_start; i <= portno_end; i++) {
293                                         accepted = NULL;
294                                         received = NULL;
295                                         pr = alloc_peer_req(peer);
296                                         if (pr) {
297                                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
298                                                 if (accepted) {
299                                                         XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
300                                                         pr->req = accepted;
301                                                         pr->portno = i;
302                                                         xseg_cancel_wait(xseg, i);
303                                                         wake_up_next_thread(peer);
304                                                         handle_accepted(peer, pr, accepted);
305                                                         change = 1;
306                                                 }
307                                                 else {
308                                                         free_peer_req(peer, pr);
309                                                 }
310                                         }
311                                         received = xseg_receive(xseg, i, X_NONBLOCK);
312                                         if (received) {
313                                                 //printf("received req id: %u\n", received - xseg->requests);
314                                                 //print_req(peer->xseg, received);
315                                                 r =  xseg_get_req_data(xseg, received, (void **) &pr);
316                                                 if (r < 0 || !pr){
317                                                         //FIXME what to do here ?
318                                                         XSEGLOG2(&lc, W, "Received request with no pr data\n");
319                                                         xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
320                                                         //FIXME if fails, put req
321                                                 }
322                                                 xseg_cancel_wait(xseg, i);
323                                                 wake_up_next_thread(peer);
324                                                 handle_received(peer, pr, received);
325                                                 change = 1;
326                                         }
327                                 }
328                                 if (change)
329                                         loops = threshold;
330                         }while (change);
331                 }
332                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
333                 xseg_wait_signal(xseg, 10000000UL);
334                 xseg_cancel_wait(xseg, peer->portno_start);
335                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
336         }
337         return NULL;
338 }
339
340 void defer_request(struct peerd *peer, struct peer_req *pr)
341 {
342         // assert canDefer(peer);
343 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
344 //      xseg_signal(peer->xseg, peer->defer_portno);
345 //      free_peer_req(peer, pr);
346 }
347
348 static int peerd_loop(struct peerd *peer) 
349 {
350         if (peer->interactive_func)
351                 peer->interactive_func();
352         for (;;) {
353                 pthread_join(peer->thread[0].tid, NULL);
354         }
355         return 0;
356 }
357
358 static struct xseg *join(char *spec)
359 {
360         struct xseg_config config;
361         struct xseg *xseg;
362
363         (void)xseg_parse_spec(spec, &config);
364         xseg = xseg_join(config.type, config.name, "pthread", NULL);
365         if (xseg)
366                 return xseg;
367
368         (void)xseg_create(&config);
369         return xseg_join(config.type, config.name, "pthread", NULL);
370 }
371
372 int peerd_start_threads(struct peerd *peer)
373 {
374         int i;
375         uint32_t nr_threads = peer->nr_threads;
376         //TODO err check
377         for (i = 0; i < nr_threads; i++) {
378                 peer->thread[i].peer = peer;
379                 pthread_cond_init(&peer->thread[i].cond,NULL);
380                 pthread_mutex_init(&peer->thread[i].lock, NULL);
381                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
382                 peer->thread[i].func = NULL;
383                 peer->thread[i].arg = NULL;
384
385         }
386         return 0;
387 }
388
389 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
390                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
391 {
392         int i;
393         struct peerd *peer;
394         struct xseg_port *port;
395         peer = malloc(sizeof(struct peerd));
396         if (!peer) {
397                 perror("malloc");
398                 return NULL;
399         }
400         peer->nr_ops = nr_ops;
401         peer->defer_portno = defer_portno;
402         peer->nr_threads = nr_threads;
403
404         peer->thread = calloc(nr_threads, sizeof(struct thread));
405         if (!peer->thread)
406                 goto malloc_fail;
407         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
408         if (!peer->peer_reqs){
409 malloc_fail:
410                 perror("malloc");
411                 return NULL;
412         }
413
414         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
415                 goto malloc_fail;
416         if (!xq_alloc_empty(&peer->threads, nr_threads))
417                 goto malloc_fail;
418
419         if (xseg_initialize()){
420                 printf("cannot initialize library\n");
421                 return NULL;
422         }
423         peer->xseg = join(spec);
424         if (!peer->xseg) 
425                 return NULL;
426
427         peer->portno_start = (xport) portno_start;
428         peer->portno_end= (xport) portno_end;
429         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
430         if (!port){
431                 printf("cannot bind to port %ld\n", peer->portno_start);
432                 return NULL;
433         }
434
435         xport p;
436         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
437                 struct xseg_port *tmp;
438                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
439                 if (!tmp){
440                         printf("cannot bind to port %ld\n", p);
441                         return NULL;
442                 }
443         }
444
445         printf("Peer on ports  %u-%u\n", peer->portno_start,
446                         peer->portno_end);
447
448         for (i = 0; i < nr_ops; i++) {
449                 peer->peer_reqs[i].peer = peer;
450                 peer->peer_reqs[i].req = NULL;
451                 peer->peer_reqs[i].retval = 0;
452                 peer->peer_reqs[i].priv = NULL;
453                 peer->peer_reqs[i].portno = NoPort;
454         }
455         peer->interactive_func = NULL;
456         return peer;
457 }
458
459
460 int main(int argc, char *argv[])
461 {
462         struct peerd *peer = NULL;
463         //parse args
464         char *spec = "";
465         int i, r;
466         long portno_start = -1, portno_end = -1, portno = -1;
467         //set defaults here
468         uint32_t nr_ops = 16;
469         uint32_t nr_threads = 16 ;
470         unsigned int debug_level = 0;
471         uint32_t defer_portno = NoPort;
472         char *logfile = NULL;
473
474         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
475         // -dp xseg_portno to defer blocking requests
476         // -l log file ?
477         //TODO print messages on arg parsing error
478         
479         for (i = 1; i < argc; i++) {
480                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
481                         spec = argv[i+1];
482                         i += 1;
483                         continue;
484                 }
485
486                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
487                         portno_start = strtoul(argv[i+1], NULL, 10);
488                         i += 1;
489                         continue;
490                 }
491                 
492                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
493                         portno_end = strtoul(argv[i+1], NULL, 10);
494                         i += 1;
495                         continue;
496                 }
497
498                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
499                         portno = strtoul(argv[i+1], NULL, 10);
500                         i += 1;
501                         continue;
502                 }
503
504                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
505                         nr_ops = strtoul(argv[i+1], NULL, 10);
506                         i += 1;
507                         continue;
508                 }
509                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
510                         debug_level = atoi(argv[i+1]);
511                         i += 1;
512                         continue;
513                 }
514                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
515                         nr_threads = strtoul(argv[i+1], NULL, 10);
516                         i += 1;
517                         continue;
518                 }
519                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
520                         defer_portno = strtoul(argv[i+1], NULL, 10);
521                         i += 1;
522                         continue;
523                 }
524                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
525                         logfile = argv[i+1];
526                         i += 1;
527                         continue;
528                 }
529
530         }
531         init_logctx(&lc, argv[0], debug_level, logfile);
532         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
533         
534         //TODO perform argument sanity checks
535         verbose = debug_level;
536         if (portno != -1) {
537                 portno_start = portno;
538                 portno_end = portno;
539         }
540
541         //TODO err check
542         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
543         if (!peer)
544                 return -1;
545         r = custom_peer_init(peer, argc, argv);
546         if (r < 0)
547                 return -1;
548         peerd_start_threads(peer);
549         return peerd_loop(peer);
550 }