fix mt-mapperd according to the new blocker locking functionality
[archipelago] / xseg / peers / user / mpeer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <pthread.h>
8 #include <mpeer.h>
9 #include <sys/syscall.h>
10 #include <sys/time.h>
11 #include <signal.h>
12
13 unsigned int verbose = 0;
14 struct log_ctx lc;
15
16 struct thread {
17         struct peerd *peer;
18         pthread_t tid;
19         pthread_cond_t cond;
20         pthread_mutex_t lock;
21         void (*func)(void *arg);
22         void *arg;
23 };
24
25
26 inline static struct thread* alloc_thread(struct peerd *peer)
27 {
28         xqindex idx = xq_pop_head(&peer->threads, 1);
29         if (idx == Noneidx)
30                 return NULL;
31         return peer->thread + idx;
32 }
33
34 inline static void free_thread(struct peerd *peer, struct thread *t)
35 {
36         xqindex idx = t - peer->thread;
37         xq_append_head(&peer->threads, idx, 1);
38 }
39
40
41 inline static void __wake_up_thread(struct thread *t)
42 {
43         pthread_mutex_lock(&t->lock);
44         pthread_cond_signal(&t->cond);
45         pthread_mutex_unlock(&t->lock);
46 }
47
48 inline static void wake_up_thread(struct thread* t)
49 {
50         if (t){
51                 __wake_up_thread(t);
52         }
53 }
54
55 inline static int wake_up_next_thread(struct peerd *peer)
56 {
57         //struct thread *t = alloc_thread(peer);
58         //wake_up_thread(t);
59         //return t;
60         return (xseg_signal(peer->xseg, peer->portno_start));
61 }
62
63 inline int canDefer(struct peerd *peer)
64 {
65         return !(peer->defer_portno == NoPort);
66 }
67
68 void print_req(struct xseg *xseg, struct xseg_request *req)
69 {
70         char target[64], data[64];
71         char *req_target, *req_data;
72         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
73         req_target = xseg_get_target(xseg, req);
74         req_data = xseg_get_data(xseg, req);
75
76         if (1) {
77                 strncpy(target, req_target, end);
78                 target[end] = 0;
79                 strncpy(data, req_data, 63);
80                 data[63] = 0;
81                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
82                                 "src: %u, st: %u, dst: %u dt: %u\n"
83                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
84                                 (unsigned long)(req),
85                                 (unsigned int)req->op,
86                                 (unsigned long long)req->offset,
87                                 (unsigned long)req->size,
88                                 (unsigned long)req->serviced,
89                                 (unsigned int)req->state,
90                                 (unsigned int)req->src_portno,
91                                 (unsigned int)req->src_transit_portno,
92                                 (unsigned int)req->dst_portno,
93                                 (unsigned int)req->dst_transit_portno,
94                                 (unsigned int)req->targetlen, target,
95                                 (unsigned long long)req->datalen, data);
96         }
97 }
98 void log_pr(char *msg, struct peer_req *pr)
99 {
100         char target[64], data[64];
101         char *req_target, *req_data;
102         struct peerd *peer = pr->peer;
103         struct xseg *xseg = pr->peer->xseg;
104         req_target = xseg_get_target(xseg, pr->req);
105         req_data = xseg_get_data(xseg, pr->req);
106         /* null terminate name in case of req->target is less than 63 characters,
107          * and next character after name (aka first byte of next buffer) is not
108          * null
109          */
110         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
111         if (verbose) {
112                 strncpy(target, req_target, end);
113                 target[end] = 0;
114                 strncpy(data, req_data, 63);
115                 data[63] = 0;
116                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
117                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
118                                 msg,
119                                 (unsigned int)(pr - peer->peer_reqs),
120                                 (unsigned int)pr->req->op,
121                                 (unsigned long long)pr->req->offset,
122                                 (unsigned long)pr->req->size,
123                                 (unsigned long)pr->req->serviced,
124                                 (unsigned long)pr->retval,
125                                 (unsigned int)pr->req->state,
126                                 (unsigned int)pr->req->targetlen, target,
127                                 (unsigned long long)pr->req->datalen, data);
128         }
129 }
130
131 inline struct peer_req *alloc_peer_req(struct peerd *peer)
132 {
133         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
134         if (idx == Noneidx)
135                 return NULL;
136         return peer->peer_reqs + idx;
137 }
138
139 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
140 {
141         xqindex idx = pr - peer->peer_reqs;
142         pr->req = NULL;
143         xq_append_head(&peer->free_reqs, idx, 1);
144 }
145
146 struct timeval resp_start, resp_end, resp_accum = {0, 0};
147 uint64_t responds = 0;
148 void get_responds_stats(){
149                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
150                                 //(unsigned int)(t - peer->thread),
151                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
152 }
153
154 //FIXME error check
155 void fail(struct peerd *peer, struct peer_req *pr)
156 {
157         struct xseg_request *req = pr->req;
158         uint32_t p;
159         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
160         req->state |= XS_FAILED;
161         //xseg_set_req_data(peer->xseg, pr->req, NULL);
162         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
163         xseg_signal(peer->xseg, p);
164         free_peer_req(peer, pr);
165         wake_up_next_thread(peer);
166 }
167
168 //FIXME error check
169 void complete(struct peerd *peer, struct peer_req *pr)
170 {
171         struct xseg_request *req = pr->req;
172         uint32_t p;
173         req->state |= XS_SERVED;
174         //xseg_set_req_data(peer->xseg, pr->req, NULL);
175         //gettimeofday(&resp_start, NULL);
176         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
177         //gettimeofday(&resp_end, NULL);
178         //responds++;
179         //timersub(&resp_end, &resp_start, &resp_end);
180         //timeradd(&resp_end, &resp_accum, &resp_accum);
181         //printf("xseg_signal: %u\n", p);
182         xseg_signal(peer->xseg, p);
183         free_peer_req(peer, pr);
184         wake_up_next_thread(peer);
185 }
186
187 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
188                                 struct xseg_request *req)
189 {
190         struct xseg_request *xreq = pr->req;
191         //assert xreq == req;
192         XSEGLOG2(&lc, D, "Handle accepted");
193         xreq->serviced = 0;
194         //xreq->state = XS_ACCEPTED;
195         pr->retval = 0;
196         dispatch(peer, pr, req, dispatch_accept);
197 }
198
199 static void handle_received(struct peerd *peer, struct peer_req *pr,
200                                 struct xseg_request *req)
201 {
202         //struct xseg_request *req = pr->req;
203         //assert req->state != XS_ACCEPTED;
204         XSEGLOG2(&lc, D, "Handle received \n");
205         dispatch(peer, pr, req, dispatch_receive);
206
207 }
208 struct timeval sub_start, sub_end, sub_accum = {0, 0};
209 uint64_t submits = 0;
210 void get_submits_stats(){
211                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
212                                 //(unsigned int)(t - peer->thread),
213                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
214 }
215
216 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
217 {
218         uint32_t ret;
219         struct xseg_request *req = pr->req;
220         // assert req->portno == peer->portno ?
221         //TODO small function with error checking
222         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
223         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
224         if (ret < 0)
225                 return -1;
226         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
227         //gettimeofday(&sub_start, NULL);
228         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
229         //gettimeofday(&sub_end, NULL);
230         //submits++;
231         //timersub(&sub_end, &sub_start, &sub_end);
232         //timeradd(&sub_end, &sub_accum, &sub_accum);
233         if (ret == NoPort)
234                 return -1;
235         xseg_signal(peer->xseg, ret);
236         return 0;
237 }
238
239 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
240 {
241         struct thread *t = alloc_thread(peer);
242         if (t) {
243                 t->func = func;
244                 t->arg = arg;
245                 wake_up_thread(t);
246                 return 0;
247         } else
248                 // we could hijack a thread
249                 return -1;
250 }
251
252 static int check_ports(struct peerd *peer)
253 {
254         struct xseg *xseg = peer->xseg;
255         xport portno_start = peer->portno_start;
256         xport portno_end = peer->portno_end;
257         struct xseg_request *accepted, *received;
258         struct peer_req *pr;
259         xport i;
260         int  r, c = 0;
261
262         for (i = portno_start; i <= portno_end; i++) {
263                 accepted = NULL;
264                 received = NULL;
265                 pr = alloc_peer_req(peer);
266                 if (pr) {
267                         accepted = xseg_accept(xseg, i, X_NONBLOCK);
268                         if (accepted) {
269                                 pr->req = accepted;
270                                 pr->portno = i;
271                                 xseg_cancel_wait(xseg, i);
272                                 handle_accepted(peer, pr, accepted);
273                                 c = 1;
274                         }
275                         else {
276                                 free_peer_req(peer, pr);
277                         }
278                 }
279                 received = xseg_receive(xseg, i, X_NONBLOCK);
280                 if (received) {
281                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
282                         if (r < 0 || !pr){
283                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
284                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
285                                 if (p == NoPort){
286                                         XSEGLOG2(&lc, W, "Could not respond stale request");
287                                         xseg_put_request(xseg, received, portno_start);
288                                         continue;
289                                 } else {
290                                         xseg_signal(xseg, p);
291                                 }
292                         } else {
293                                 //maybe perform sanity check for pr
294                                 xseg_cancel_wait(xseg, i);
295                                 handle_received(peer, pr, received);
296                                 c = 1;
297                         }
298                 }
299         }
300
301         return c;
302 }
303
304 static void* thread_loop(void *arg)
305 {
306         struct thread *t = (struct thread *) arg;
307         struct peerd *peer = t->peer;
308         struct xseg *xseg = peer->xseg;
309         xport portno_start = peer->portno_start;
310         xport portno_end = peer->portno_end;
311         pid_t pid =syscall(SYS_gettid);
312         uint64_t loops;
313         uint64_t threshold=1000/(1 + portno_end - portno_start);
314         
315         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
316
317         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
318         xseg_init_local_signal(xseg, peer->portno_start);
319         for (;;) {
320                 if (t->func) {
321                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
322                         xseg_cancel_wait(xseg, peer->portno_start);
323                         t->func(t->arg);
324                         t->func = NULL;
325                         t->arg = NULL;
326                         continue;
327                 }
328
329                 for(loops= threshold; loops > 0; loops--) {
330                         if (loops == 1)
331                                 xseg_prepare_wait(xseg, peer->portno_start);
332                         if (check_ports(peer))
333                                 loops = threshold;
334                 }
335                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
336                 xseg_wait_signal(xseg, 10000000UL);
337                 xseg_cancel_wait(xseg, peer->portno_start);
338                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
339         }
340         return NULL;
341 }
342
343 void defer_request(struct peerd *peer, struct peer_req *pr)
344 {
345         // assert canDefer(peer);
346 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
347 //      xseg_signal(peer->xseg, peer->defer_portno);
348 //      free_peer_req(peer, pr);
349 }
350
351 static int peerd_loop(struct peerd *peer) 
352 {
353         if (peer->interactive_func)
354                 peer->interactive_func();
355         for (;;) {
356                 pthread_join(peer->thread[0].tid, NULL);
357         }
358         return 0;
359 }
360
361 static struct xseg *join(char *spec)
362 {
363         struct xseg_config config;
364         struct xseg *xseg;
365
366         (void)xseg_parse_spec(spec, &config);
367         xseg = xseg_join(config.type, config.name, "pthread", NULL);
368         if (xseg)
369                 return xseg;
370
371         (void)xseg_create(&config);
372         return xseg_join(config.type, config.name, "pthread", NULL);
373 }
374
375 int peerd_start_threads(struct peerd *peer)
376 {
377         int i;
378         uint32_t nr_threads = peer->nr_threads;
379         //TODO err check
380         for (i = 0; i < nr_threads; i++) {
381                 peer->thread[i].peer = peer;
382                 pthread_cond_init(&peer->thread[i].cond,NULL);
383                 pthread_mutex_init(&peer->thread[i].lock, NULL);
384                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
385                 peer->thread[i].func = NULL;
386                 peer->thread[i].arg = NULL;
387
388         }
389         return 0;
390 }
391
392 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
393                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
394 {
395         int i;
396         struct peerd *peer;
397         struct xseg_port *port;
398         peer = malloc(sizeof(struct peerd));
399         if (!peer) {
400                 perror("malloc");
401                 return NULL;
402         }
403         peer->nr_ops = nr_ops;
404         peer->defer_portno = defer_portno;
405         peer->nr_threads = nr_threads;
406
407         peer->thread = calloc(nr_threads, sizeof(struct thread));
408         if (!peer->thread)
409                 goto malloc_fail;
410         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
411         if (!peer->peer_reqs){
412 malloc_fail:
413                 perror("malloc");
414                 return NULL;
415         }
416
417         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
418                 goto malloc_fail;
419         if (!xq_alloc_empty(&peer->threads, nr_threads))
420                 goto malloc_fail;
421
422         if (xseg_initialize()){
423                 printf("cannot initialize library\n");
424                 return NULL;
425         }
426         peer->xseg = join(spec);
427         if (!peer->xseg) 
428                 return NULL;
429
430         peer->portno_start = (xport) portno_start;
431         peer->portno_end= (xport) portno_end;
432         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
433         if (!port){
434                 printf("cannot bind to port %ld\n", peer->portno_start);
435                 return NULL;
436         }
437
438         xport p;
439         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
440                 struct xseg_port *tmp;
441                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
442                 if (!tmp){
443                         printf("cannot bind to port %ld\n", p);
444                         return NULL;
445                 }
446         }
447
448         printf("Peer on ports  %u-%u\n", peer->portno_start,
449                         peer->portno_end);
450
451         for (i = 0; i < nr_ops; i++) {
452                 peer->peer_reqs[i].peer = peer;
453                 peer->peer_reqs[i].req = NULL;
454                 peer->peer_reqs[i].retval = 0;
455                 peer->peer_reqs[i].priv = NULL;
456                 peer->peer_reqs[i].portno = NoPort;
457         }
458         peer->interactive_func = NULL;
459         return peer;
460 }
461
462
463 int main(int argc, char *argv[])
464 {
465         struct peerd *peer = NULL;
466         //parse args
467         char *spec = "";
468         int i, r;
469         long portno_start = -1, portno_end = -1, portno = -1;
470         //set defaults here
471         uint32_t nr_ops = 16;
472         uint32_t nr_threads = 16 ;
473         unsigned int debug_level = 0;
474         uint32_t defer_portno = NoPort;
475         char *logfile = NULL;
476
477         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
478         // -dp xseg_portno to defer blocking requests
479         // -l log file ?
480         //TODO print messages on arg parsing error
481         
482         for (i = 1; i < argc; i++) {
483                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
484                         spec = argv[i+1];
485                         i += 1;
486                         continue;
487                 }
488
489                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
490                         portno_start = strtoul(argv[i+1], NULL, 10);
491                         i += 1;
492                         continue;
493                 }
494                 
495                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
496                         portno_end = strtoul(argv[i+1], NULL, 10);
497                         i += 1;
498                         continue;
499                 }
500
501                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
502                         portno = strtoul(argv[i+1], NULL, 10);
503                         i += 1;
504                         continue;
505                 }
506
507                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
508                         nr_ops = strtoul(argv[i+1], NULL, 10);
509                         i += 1;
510                         continue;
511                 }
512                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
513                         debug_level = atoi(argv[i+1]);
514                         i += 1;
515                         continue;
516                 }
517                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
518                         nr_threads = strtoul(argv[i+1], NULL, 10);
519                         i += 1;
520                         continue;
521                 }
522                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
523                         defer_portno = strtoul(argv[i+1], NULL, 10);
524                         i += 1;
525                         continue;
526                 }
527                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
528                         logfile = argv[i+1];
529                         i += 1;
530                         continue;
531                 }
532
533         }
534         init_logctx(&lc, argv[0], debug_level, logfile);
535         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
536         
537         //TODO perform argument sanity checks
538         verbose = debug_level;
539         if (portno != -1) {
540                 portno_start = portno;
541                 portno_end = portno;
542         }
543
544         //TODO err check
545         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
546         if (!peer)
547                 return -1;
548         r = custom_peer_init(peer, argc, argv);
549         if (r < 0)
550                 return -1;
551         peerd_start_threads(peer);
552         return peerd_loop(peer);
553 }