added speer skeletor
[archipelago] / xseg / peers / user / mpeer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <pthread.h>
8 #include <mpeer.h>
9 #include <sys/syscall.h>
10 #include <sys/time.h>
11 #include <signal.h>
12
13 #define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
14 #define LOG(level, ...)                                              \
15                 do {                                                               \
16                         if (level <=  verbose) {                           \
17                                 fprintf(stderr, "%s: "  REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
18                         }                                                          \
19                 }while (0)
20
21
22 unsigned int verbose = 0;
23 struct log_ctx lc;
24
25 struct thread {
26         struct peerd *peer;
27         pthread_t tid;
28         pthread_cond_t cond;
29         pthread_mutex_t lock;
30         void (*func)(void *arg);
31         void *arg;
32 };
33
34
35 inline int canDefer(struct peerd *peer)
36 {
37         return !(peer->defer_portno == NoPort);
38 }
39
40 void print_req(struct xseg *xseg, struct xseg_request *req)
41 {
42         char target[64], data[64];
43         char *req_target, *req_data;
44         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
45         req_target = xseg_get_target(xseg, req);
46         req_data = xseg_get_data(xseg, req);
47
48         if (1) {
49                 strncpy(target, req_target, end);
50                 target[end] = 0;
51                 strncpy(data, req_data, 63);
52                 data[63] = 0;
53                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
54                                 "src: %u, st: %u, dst: %u dt: %u\n"
55                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
56                                 (unsigned long)(req),
57                                 (unsigned int)req->op,
58                                 (unsigned long long)req->offset,
59                                 (unsigned long)req->size,
60                                 (unsigned long)req->serviced,
61                                 (unsigned int)req->state,
62                                 (unsigned int)req->src_portno,
63                                 (unsigned int)req->src_transit_portno,
64                                 (unsigned int)req->dst_portno,
65                                 (unsigned int)req->dst_transit_portno,
66                                 (unsigned int)req->targetlen, target,
67                                 (unsigned long long)req->datalen, data);
68         }
69 }
70 void log_pr(char *msg, struct peer_req *pr)
71 {
72         char target[64], data[64];
73         char *req_target, *req_data;
74         struct peerd *peer = pr->peer;
75         struct xseg *xseg = pr->peer->xseg;
76         req_target = xseg_get_target(xseg, pr->req);
77         req_data = xseg_get_data(xseg, pr->req);
78         /* null terminate name in case of req->target is less than 63 characters,
79          * and next character after name (aka first byte of next buffer) is not
80          * null
81          */
82         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
83         if (verbose) {
84                 strncpy(target, req_target, end);
85                 target[end] = 0;
86                 strncpy(data, req_data, 63);
87                 data[63] = 0;
88                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
89                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
90                                 msg,
91                                 (unsigned int)(pr - peer->peer_reqs),
92                                 (unsigned int)pr->req->op,
93                                 (unsigned long long)pr->req->offset,
94                                 (unsigned long)pr->req->size,
95                                 (unsigned long)pr->req->serviced,
96                                 (unsigned long)pr->retval,
97                                 (unsigned int)pr->req->state,
98                                 (unsigned int)pr->req->targetlen, target,
99                                 (unsigned long long)pr->req->datalen, data);
100         }
101 }
102
103 inline struct peer_req *alloc_peer_req(struct peerd *peer)
104 {
105         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
106         if (idx == Noneidx)
107                 return NULL;
108         return peer->peer_reqs + idx;
109 }
110
111 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
112 {
113         xqindex idx = pr - peer->peer_reqs;
114         pr->req = NULL;
115         xq_append_head(&peer->free_reqs, idx, 1);
116 }
117
118 inline static struct thread* alloc_thread(struct peerd *peer)
119 {
120         xqindex idx = xq_pop_head(&peer->threads, 1);
121         if (idx == Noneidx)
122                 return NULL;
123         return peer->thread + idx;
124 }
125
126 inline static void free_thread(struct peerd *peer, struct thread *t)
127 {
128         xqindex idx = t - peer->thread;
129         xq_append_head(&peer->threads, idx, 1);
130 }
131
132
133 inline static void __wake_up_thread(struct thread *t)
134 {
135         pthread_mutex_lock(&t->lock);
136         pthread_cond_signal(&t->cond);
137         pthread_mutex_unlock(&t->lock);
138 }
139
140 inline static void wake_up_thread(struct thread* t)
141 {
142         if (t){
143                 __wake_up_thread(t);
144         }
145 }
146
147 inline static int wake_up_next_thread(struct peerd *peer)
148 {
149         //struct thread *t = alloc_thread(peer);
150         //wake_up_thread(t);
151         //return t;
152         return (xseg_signal(peer->xseg, peer->portno_start));
153 }
154
155 struct timeval resp_start, resp_end, resp_accum = {0, 0};
156 uint64_t responds = 0;
157 void get_responds_stats(){
158                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
159                                 //(unsigned int)(t - peer->thread),
160                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
161 }
162
163 //FIXME error check
164 void fail(struct peerd *peer, struct peer_req *pr)
165 {
166         struct xseg_request *req = pr->req;
167         uint32_t p;
168         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
169         req->state |= XS_FAILED;
170         //xseg_set_req_data(peer->xseg, pr->req, NULL);
171         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
172         xseg_signal(peer->xseg, p);
173         free_peer_req(peer, pr);
174         wake_up_next_thread(peer);
175 }
176
177 //FIXME error check
178 void complete(struct peerd *peer, struct peer_req *pr)
179 {
180         struct xseg_request *req = pr->req;
181         uint32_t p;
182         req->state |= XS_SERVED;
183         //xseg_set_req_data(peer->xseg, pr->req, NULL);
184         //gettimeofday(&resp_start, NULL);
185         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
186         //gettimeofday(&resp_end, NULL);
187         //responds++;
188         //timersub(&resp_end, &resp_start, &resp_end);
189         //timeradd(&resp_end, &resp_accum, &resp_accum);
190         //printf("xseg_signal: %u\n", p);
191         xseg_signal(peer->xseg, p);
192         free_peer_req(peer, pr);
193         wake_up_next_thread(peer);
194 }
195
196 void pending(struct peerd *peer, struct peer_req *pr)
197 {
198                 pr->req->state = XS_PENDING;
199 }
200
201 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
202                                 struct xseg_request *req)
203 {
204         struct xseg_request *xreq = pr->req;
205         //assert xreq == req;
206         XSEGLOG2(&lc, D, "Handle accepted");
207         xreq->serviced = 0;
208         //xreq->state = XS_ACCEPTED;
209         pr->retval = 0;
210         dispatch(peer, pr, req);
211 }
212
213 static void handle_received(struct peerd *peer, struct peer_req *pr,
214                                 struct xseg_request *req)
215 {
216         //struct xseg_request *req = pr->req;
217         //assert req->state != XS_ACCEPTED;
218         XSEGLOG2(&lc, D, "Handle received \n");
219         dispatch(peer, pr, req);
220
221 }
222 struct timeval sub_start, sub_end, sub_accum = {0, 0};
223 uint64_t submits = 0;
224 void get_submits_stats(){
225                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
226                                 //(unsigned int)(t - peer->thread),
227                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
228 }
229
230 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
231 {
232         uint32_t ret;
233         struct xseg_request *req = pr->req;
234         // assert req->portno == peer->portno ?
235         //TODO small function with error checking
236         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
237         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
238         if (ret < 0)
239                 return -1;
240         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
241         //gettimeofday(&sub_start, NULL);
242         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
243         //gettimeofday(&sub_end, NULL);
244         //submits++;
245         //timersub(&sub_end, &sub_start, &sub_end);
246         //timeradd(&sub_end, &sub_accum, &sub_accum);
247         if (ret == NoPort)
248                 return -1;
249         xseg_signal(peer->xseg, ret);
250         return 0;
251 }
252
253 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
254 {
255         struct thread *t = alloc_thread(peer);
256         if (t) {
257                 t->func = func;
258                 t->arg = arg;
259                 wake_up_thread(t);
260                 return 0;
261         } else
262                 // we could hijack a thread
263                 return -1;
264 }
265
266 static void* thread_loop(void *arg)
267 {
268         struct thread *t = (struct thread *) arg;
269         struct peerd *peer = t->peer;
270         struct xseg *xseg = peer->xseg;
271         xport portno_start = peer->portno_start;
272         xport portno_end = peer->portno_end;
273         struct peer_req *pr;
274         uint64_t threshold=1;
275         pid_t pid =syscall(SYS_gettid);
276         uint64_t loops;
277         struct xseg_request *accepted, *received;
278         int r;
279         int change;
280         xport i;
281                 
282         XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
283
284         XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
285         xseg_init_local_signal(xseg, peer->portno_start);
286         for (;;) {
287                 if (t->func) {
288                         XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
289                         xseg_cancel_wait(xseg, peer->portno_start);
290                         t->func(t->arg);
291                         t->func = NULL;
292                         t->arg = NULL;
293                         continue;
294                 }
295
296                 for(loops= threshold; loops > 0; loops--) {
297                         do {
298                                 if (loops == 1)
299                                         xseg_prepare_wait(xseg, peer->portno_start);
300                                 change = 0;
301                                 for (i = portno_start; i <= portno_end; i++) {
302                                         accepted = NULL;
303                                         received = NULL;
304                                         pr = alloc_peer_req(peer);
305                                         if (pr) {
306                                                 accepted = xseg_accept(xseg, i, X_NONBLOCK);
307                                                 if (accepted) {
308                                                         XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
309                                                         pr->req = accepted;
310                                                         pr->portno = i;
311                                                         xseg_cancel_wait(xseg, i);
312                                                         wake_up_next_thread(peer);
313                                                         handle_accepted(peer, pr, accepted);
314                                                         change = 1;
315                                                 }
316                                                 else {
317                                                         free_peer_req(peer, pr);
318                                                 }
319                                         }
320                                         received = xseg_receive(xseg, i, X_NONBLOCK);
321                                         if (received) {
322                                                 //printf("received req id: %u\n", received - xseg->requests);
323                                                 //print_req(peer->xseg, received);
324                                                 r =  xseg_get_req_data(xseg, received, (void **) &pr);
325                                                 if (r < 0 || !pr){
326                                                         //FIXME what to do here ?
327                                                         XSEGLOG2(&lc, W, "Received request with no pr data\n");
328                                                         xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
329                                                         //FIXME if fails, put req
330                                                 }
331                                                 xseg_cancel_wait(xseg, i);
332                                                 wake_up_next_thread(peer);
333                                                 handle_received(peer, pr, received);
334                                                 change = 1;
335                                         }
336                                 }
337                                 if (change)
338                                         loops = threshold;
339                         }while (change);
340                 }
341                 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
342                 xseg_wait_signal(xseg, 10000000UL);
343                 xseg_cancel_wait(xseg, peer->portno_start);
344                 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
345         }
346         return NULL;
347 }
348
349 void defer_request(struct peerd *peer, struct peer_req *pr)
350 {
351         // assert canDefer(peer);
352 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
353 //      xseg_signal(peer->xseg, peer->defer_portno);
354 //      free_peer_req(peer, pr);
355 }
356
357 static int peerd_loop(struct peerd *peer) 
358 {
359         if (peer->interactive_func)
360                 peer->interactive_func();
361         for (;;) {
362                 pthread_join(peer->thread[0].tid, NULL);
363         }
364         return 0;
365 }
366
367 static struct xseg *join(char *spec)
368 {
369         struct xseg_config config;
370         struct xseg *xseg;
371
372         (void)xseg_parse_spec(spec, &config);
373         xseg = xseg_join(config.type, config.name, "pthread", NULL);
374         if (xseg)
375                 return xseg;
376
377         (void)xseg_create(&config);
378         return xseg_join(config.type, config.name, "pthread", NULL);
379 }
380
381 int peerd_start_threads(struct peerd *peer)
382 {
383         int i;
384         uint32_t nr_threads = peer->nr_threads;
385         //TODO err check
386         for (i = 0; i < nr_threads; i++) {
387                 peer->thread[i].peer = peer;
388                 pthread_cond_init(&peer->thread[i].cond,NULL);
389                 pthread_mutex_init(&peer->thread[i].lock, NULL);
390                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
391                 peer->thread[i].func = NULL;
392                 peer->thread[i].arg = NULL;
393
394         }
395         return 0;
396 }
397
398 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
399                         long portno_end, uint32_t nr_threads, uint32_t defer_portno)
400 {
401         int i;
402         struct peerd *peer;
403         peer = malloc(sizeof(struct peerd));
404         if (!peer) {
405                 perror("malloc");
406                 return NULL;
407         }
408         peer->nr_ops = nr_ops;
409         peer->defer_portno = defer_portno;
410         peer->nr_threads = nr_threads;
411
412         peer->thread = calloc(nr_threads, sizeof(struct thread));
413         if (!peer->thread)
414                 goto malloc_fail;
415         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
416         if (!peer->peer_reqs){
417 malloc_fail:
418                 perror("malloc");
419                 return NULL;
420         }
421
422         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
423                 goto malloc_fail;
424         if (!xq_alloc_empty(&peer->threads, nr_threads))
425                 goto malloc_fail;
426
427         if (xseg_initialize()){
428                 printf("cannot initialize library\n");
429                 return NULL;
430         }
431         peer->xseg = join(spec);
432         if (!peer->xseg) 
433                 return NULL;
434
435         peer->portno_start = (xport) portno_start;
436         peer->portno_end= (xport) portno_end;
437         peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
438         if (!peer->port){
439                 printf("cannot bind to port %ld\n", peer->portno_start);
440                 return NULL;
441         }
442
443         xport p;
444         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
445                 struct xseg_port *tmp;
446                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
447                 if (!tmp){
448                         printf("cannot bind to port %ld\n", p);
449                         return NULL;
450                 }
451         }
452
453         printf("Peer on ports  %u-%u\n", peer->portno_start,
454                         peer->portno_end);
455
456         for (i = 0; i < nr_ops; i++) {
457                 peer->peer_reqs[i].peer = peer;
458                 peer->peer_reqs[i].req = NULL;
459                 peer->peer_reqs[i].retval = 0;
460                 peer->peer_reqs[i].priv = NULL;
461         }
462         peer->interactive_func = NULL;
463         return peer;
464 }
465
466
467 int main(int argc, char *argv[])
468 {
469         struct peerd *peer = NULL;
470         //parse args
471         char *spec = "";
472         int i, r;
473         long portno_start = -1, portno_end = -1;
474         //set defaults here
475         uint32_t nr_ops = 16;
476         uint32_t nr_threads = 16 ;
477         unsigned int debug_level = 0;
478         uint32_t defer_portno = NoPort;
479         char *logfile = NULL;
480
481         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
482         // -dp xseg_portno to defer blocking requests
483         // -l log file ?
484         //TODO print messages on arg parsing error
485         
486         for (i = 1; i < argc; i++) {
487                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
488                         spec = argv[i+1];
489                         i += 1;
490                         continue;
491                 }
492
493                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
494                         portno_start = strtoul(argv[i+1], NULL, 10);
495                         i += 1;
496                         continue;
497                 }
498                 
499                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
500                         portno_end = strtoul(argv[i+1], NULL, 10);
501                         i += 1;
502                         continue;
503                 }
504
505                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
506                         nr_ops = strtoul(argv[i+1], NULL, 10);
507                         i += 1;
508                         continue;
509                 }
510                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
511                         debug_level = atoi(argv[i+1]);
512                         i += 1;
513                         continue;
514                 }
515                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
516                         nr_threads = strtoul(argv[i+1], NULL, 10);
517                         i += 1;
518                         continue;
519                 }
520                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
521                         defer_portno = strtoul(argv[i+1], NULL, 10);
522                         i += 1;
523                         continue;
524                 }
525                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
526                         logfile = argv[i+1];
527                         i += 1;
528                         continue;
529                 }
530
531         }
532         init_logctx(&lc, argv[0], debug_level, logfile);
533         XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
534         
535         //TODO perform argument sanity checks
536         verbose = debug_level;
537
538         //TODO err check
539         peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
540         if (!peer)
541                 return -1;
542         r = custom_peer_init(peer, argc, argv);
543         if (r < 0)
544                 return -1;
545         peerd_start_threads(peer);
546         return peerd_loop(peer);
547 }