fix various bugs in mt-mapperd. add map write support
[archipelago] / xseg / peers / user / mpeer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <pthread.h>
8 #include <mpeer.h>
9 #include <sys/syscall.h>
10 #include <sys/time.h>
11 #include <signal.h>
12
13 #define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
14 #define LOG(level, ...)                                              \
15                 do {                                                               \
16                         if (level <=  verbose) {                           \
17                                 fprintf(stderr, "%s: "  REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
18                         }                                                          \
19                 }while (0)
20
21
22 unsigned int verbose = 0;
23
24 struct thread {
25         struct peerd *peer;
26         pthread_t tid;
27         pthread_cond_t cond;
28         pthread_mutex_t lock;
29         void (*func)(void *arg);
30         void *arg;
31 };
32
33
34 inline int canDefer(struct peerd *peer)
35 {
36         return !(peer->defer_portno == NoPort);
37 }
38
39 void print_req(struct xseg *xseg, struct xseg_request *req)
40 {
41         char target[64], data[64];
42         char *req_target, *req_data;
43         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
44         req_target = xseg_get_target(xseg, req);
45         req_data = xseg_get_data(xseg, req);
46
47         if (1) {
48                 strncpy(target, req_target, end);
49                 target[end] = 0;
50                 strncpy(data, req_data, 63);
51                 data[63] = 0;
52                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
53                                 "src: %u, st: %u, dst: %u dt: %u\n"
54                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
55                                 (unsigned long)(req),
56                                 (unsigned int)req->op,
57                                 (unsigned long long)req->offset,
58                                 (unsigned long)req->size,
59                                 (unsigned long)req->serviced,
60                                 (unsigned int)req->state,
61                                 (unsigned int)req->src_portno,
62                                 (unsigned int)req->src_transit_portno,
63                                 (unsigned int)req->dst_portno,
64                                 (unsigned int)req->dst_transit_portno,
65                                 (unsigned int)req->targetlen, target,
66                                 (unsigned long long)req->datalen, data);
67         }
68 }
69 void log_pr(char *msg, struct peer_req *pr)
70 {
71         char target[64], data[64];
72         char *req_target, *req_data;
73         struct peerd *peer = pr->peer;
74         struct xseg *xseg = pr->peer->xseg;
75         req_target = xseg_get_target(xseg, pr->req);
76         req_data = xseg_get_data(xseg, pr->req);
77         /* null terminate name in case of req->target is less than 63 characters,
78          *          * and next character after name (aka first byte of next buffer) is not
79          *                   * null
80          *                            */
81         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
82         if (verbose) {
83                 strncpy(target, req_target, end);
84                 target[end] = 0;
85                 strncpy(data, req_data, 63);
86                 data[63] = 0;
87                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
88                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
89                                 msg,
90                                 (unsigned int)(pr - peer->peer_reqs),
91                                 (unsigned int)pr->req->op,
92                                 (unsigned long long)pr->req->offset,
93                                 (unsigned long)pr->req->size,
94                                 (unsigned long)pr->req->serviced,
95                                 (unsigned long)pr->retval,
96                                 (unsigned int)pr->req->state,
97                                 (unsigned int)pr->req->targetlen, target,
98                                 (unsigned long long)pr->req->datalen, data);
99         }
100 }
101
102 inline struct peer_req *alloc_peer_req(struct peerd *peer)
103 {
104         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
105         if (idx == Noneidx)
106                 return NULL;
107         return peer->peer_reqs + idx;
108 }
109
110 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
111 {
112         xqindex idx = pr - peer->peer_reqs;
113         pr->req = NULL;
114         xq_append_head(&peer->free_reqs, idx, 1);
115 }
116
117 inline static struct thread* alloc_thread(struct peerd *peer)
118 {
119         xqindex idx = xq_pop_head(&peer->threads, 1);
120         if (idx == Noneidx)
121                 return NULL;
122         return peer->thread + idx;
123 }
124
125 inline static void free_thread(struct peerd *peer, struct thread *t)
126 {
127         xqindex idx = t - peer->thread;
128         xq_append_head(&peer->threads, idx, 1);
129 }
130
131
132 inline static void __wake_up_thread(struct thread *t)
133 {
134         pthread_mutex_lock(&t->lock);
135         pthread_cond_signal(&t->cond);
136         pthread_mutex_unlock(&t->lock);
137 }
138
139 inline static void wake_up_thread(struct thread* t)
140 {
141         if (t){
142                 __wake_up_thread(t);
143         }
144 }
145
146 inline static int wake_up_next_thread(struct peerd *peer)
147 {
148         //struct thread *t = alloc_thread(peer);
149         //wake_up_thread(t);
150         //return t;
151         return (xseg_signal(peer->xseg, peer->portno));
152 }
153
154 struct timeval resp_start, resp_end, resp_accum = {0, 0};
155 uint64_t responds = 0;
156 void get_responds_stats(){
157                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
158                                 //(unsigned int)(t - peer->thread),
159                                 resp_accum.tv_sec, resp_accum.tv_usec, responds);
160 }
161
162 //FIXME error check
163 void fail(struct peerd *peer, struct peer_req *pr)
164 {
165         struct xseg_request *req = pr->req;
166         uint32_t p;
167         LOG(5, "failing req %u\n", (unsigned int) (pr - peer->peer_reqs));
168         req->state |= XS_FAILED;
169         //xseg_set_req_data(peer->xseg, pr->req, NULL);
170         p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
171         xseg_signal(peer->xseg, p);
172         free_peer_req(peer, pr);
173         wake_up_next_thread(peer);
174 }
175
176 //FIXME error check
177 void complete(struct peerd *peer, struct peer_req *pr)
178 {
179         struct xseg_request *req = pr->req;
180         uint32_t p;
181         req->state |= XS_SERVED;
182         //xseg_set_req_data(peer->xseg, pr->req, NULL);
183         //gettimeofday(&resp_start, NULL);
184         p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
185         //gettimeofday(&resp_end, NULL);
186         //responds++;
187         //timersub(&resp_end, &resp_start, &resp_end);
188         //timeradd(&resp_end, &resp_accum, &resp_accum);
189         //printf("xseg_signal: %u\n", p);
190         xseg_signal(peer->xseg, p);
191         free_peer_req(peer, pr);
192         wake_up_next_thread(peer);
193 }
194
195 void pending(struct peerd *peer, struct peer_req *pr)
196 {
197                 pr->req->state = XS_PENDING;
198 }
199
200 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
201                                 struct xseg_request *req)
202 {
203         struct xseg_request *xreq = pr->req;
204         //assert xreq == req;
205         LOG(4, "Handle accepted \n");
206         xreq->serviced = 0;
207         //xreq->state = XS_ACCEPTED;
208         pr->retval = 0;
209         dispatch(peer, pr, req);
210 }
211
212 static void handle_received(struct peerd *peer, struct peer_req *pr,
213                                 struct xseg_request *req)
214 {
215         //struct xseg_request *req = pr->req;
216         //assert req->state != XS_ACCEPTED;
217         LOG(4, "Handle received \n");
218         dispatch(peer, pr, req);
219
220 }
221 struct timeval sub_start, sub_end, sub_accum = {0, 0};
222 uint64_t submits = 0;
223 void get_submits_stats(){
224                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
225                                 //(unsigned int)(t - peer->thread),
226                                 sub_accum.tv_sec, sub_accum.tv_usec, submits);
227 }
228
229 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
230 {
231         uint32_t ret;
232         struct xseg_request *req = pr->req;
233         // assert req->portno == peer->portno ?
234         //TODO small function with error checking
235         LOG (5, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
236         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
237         if (ret < 0)
238                 return -1;
239         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
240         //gettimeofday(&sub_start, NULL);
241         ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
242         //gettimeofday(&sub_end, NULL);
243         //submits++;
244         //timersub(&sub_end, &sub_start, &sub_end);
245         //timeradd(&sub_end, &sub_accum, &sub_accum);
246         if (ret == NoPort)
247                 return -1;
248         xseg_signal(peer->xseg, ret);
249         return 0;
250 }
251
252 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
253 {
254         struct thread *t = alloc_thread(peer);
255         if (t) {
256                 t->func = func;
257                 t->arg = arg;
258                 wake_up_thread(t);
259                 return 0;
260         } else
261                 // we could hijack a thread
262                 return -1;
263 }
264
265 static void* thread_loop(void *arg)
266 {
267         struct thread *t = (struct thread *) arg;
268         struct peerd *peer = t->peer;
269         struct xseg *xseg = peer->xseg;
270         uint32_t portno = peer->portno;
271         struct peer_req *pr;
272         uint64_t threshold=1000;
273         pid_t pid =syscall(SYS_gettid);
274         uint64_t loops;
275         struct xseg_request *accepted, *received;
276         int r;
277                 
278         printf("thread %u\n",  (unsigned int) (t- peer->thread));
279
280         LOG(0, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
281         xseg_init_local_signal(xseg, portno);
282         for (;;) {
283                 if (t->func) {
284                         LOG(5, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
285                         xseg_cancel_wait(xseg, portno);
286                         t->func(t->arg);
287                         t->func = NULL;
288                         t->arg = NULL;
289                         continue;
290                 }
291
292                 for(loops= threshold; loops > 0; loops--) {
293                         accepted = NULL;
294                         received = NULL;
295                         if (loops == 1)
296                                 xseg_prepare_wait(xseg, portno);
297
298 //                      if (xq_count(&peer->xport->request_queue)){
299                                 pr = alloc_peer_req(peer);
300                                 if (pr) {
301                                         accepted = xseg_accept(xseg, peer->portno);
302                                         LOG(5, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
303                                         if (accepted) {
304                                                 pr->req = accepted;
305                                                 xseg_cancel_wait(xseg, portno);
306                                                 wake_up_next_thread(peer);
307                                                 handle_accepted(peer, pr, accepted);
308                                                 loops = threshold;
309                                         }
310                                         else {
311                                                 free_peer_req(peer, pr);
312                                         }
313                                 }
314 //                      }
315 //                      if (xq_count(&peer->xport->reply_queue)){
316                                 received = xseg_receive(xseg, peer->portno);
317                                 if (received) {
318                                         //printf("received req id: %u\n", received - xseg->requests);
319                                         //print_req(peer->xseg, received);
320                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
321                                         if (r < 0 || !pr){
322                                                 //FIXME what to do here ?
323                                                 LOG(0, "Received request with no pr data\n");
324                                                 xseg_respond(peer->xseg, received, peer->portno, X_ALLOC);
325                                         }
326                                         //fail(peer, received);
327                                         //assert pr->req == received;
328                                         xseg_cancel_wait(xseg, portno);
329                                         wake_up_next_thread(peer);
330                                         handle_received(peer, pr, received);
331                                         loops = threshold;
332                                 }
333 //                      }
334                 }
335                 LOG(1, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
336                 xseg_wait_signal(xseg, 10000000UL);
337                 xseg_cancel_wait(xseg, portno);
338                 LOG(1, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
339         }
340         return NULL;
341 }
342
343 void defer_request(struct peerd *peer, struct peer_req *pr)
344 {
345         // assert canDefer(peer);
346 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
347 //      xseg_signal(peer->xseg, peer->defer_portno);
348 //      free_peer_req(peer, pr);
349 }
350
351 static int peerd_loop(struct peerd *peer) 
352 {
353         if (peer->interactive_func)
354                 peer->interactive_func();
355         for (;;) {
356                 pthread_join(peer->thread[0].tid, NULL);
357         }
358         return 0;
359 }
360
361 static struct xseg *join(char *spec)
362 {
363         struct xseg_config config;
364         struct xseg *xseg;
365
366         (void)xseg_parse_spec(spec, &config);
367         xseg = xseg_join(config.type, config.name, "pthread", NULL);
368         if (xseg)
369                 return xseg;
370
371         (void)xseg_create(&config);
372         return xseg_join(config.type, config.name, "pthread", NULL);
373 }
374
375 int peerd_start_threads(struct peerd *peer)
376 {
377         int i;
378         uint32_t nr_threads = peer->nr_threads;
379         //TODO err check
380         for (i = 0; i < nr_threads; i++) {
381                 peer->thread[i].peer = peer;
382                 pthread_cond_init(&peer->thread[i].cond,NULL);
383                 pthread_mutex_init(&peer->thread[i].lock, NULL);
384                 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
385                 peer->thread[i].func = NULL;
386                 peer->thread[i].arg = NULL;
387
388         }
389         return 0;
390 }
391
392 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno)
393 {
394         int i;
395         struct peerd *peer;
396         peer = malloc(sizeof(struct peerd));
397         if (!peer) {
398                 perror("malloc");
399                 return NULL;
400         }
401         peer->nr_ops = nr_ops;
402         peer->defer_portno = defer_portno;
403         peer->nr_threads = nr_threads;
404
405         peer->thread = calloc(nr_threads, sizeof(struct thread));
406         if (!peer->thread)
407                 goto malloc_fail;
408         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
409         if (!peer->peer_reqs){
410 malloc_fail:
411                 perror("malloc");
412                 return NULL;
413         }
414
415         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
416                 goto malloc_fail;
417         if (!xq_alloc_empty(&peer->threads, nr_threads))
418                 goto malloc_fail;
419
420         if (xseg_initialize()){
421                 printf("cannot initialize library\n");
422                 return NULL;
423         }
424         peer->xseg = join(spec);
425         if (!peer->xseg) 
426                 return NULL;
427
428         peer->xport = xseg_bind_port(peer->xseg, portno);
429         if (!peer->xport){
430                 printf("cannot bind to port %ld\n", portno);
431                 return NULL;
432         }
433         printf("%lx\n", (unsigned long) peer->xport);
434         peer->portno = xseg_portno(peer->xseg, peer->xport);
435         printf("Peer on port %u/%u\n", peer->portno,
436                         peer->xseg->config.nr_ports);
437
438         for (i = 0; i < nr_ops; i++) {
439                 peer->peer_reqs[i].peer = peer;
440                 peer->peer_reqs[i].req = NULL;
441                 peer->peer_reqs[i].retval = 0;
442                 peer->peer_reqs[i].priv = NULL;
443         }
444         peer->interactive_func = NULL;
445         return peer;
446 }
447
448
449 int main(int argc, const char *argv[])
450 {
451         struct peerd *peer = NULL;
452         //parse args
453         char *spec = "";
454         int i, r;
455         long portno = -1;
456         //set defaults here
457         uint32_t nr_ops = 16;
458         uint32_t nr_threads = 16 ;
459         unsigned int debug_level = 0;
460         uint32_t defer_portno = NoPort;
461         
462         //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
463         // -dp xseg_portno to defer blocking requests
464         //maybe -l log file ?
465         //TODO print messages on arg parsing error
466         LOG(5, "Main thread has tid %ld.\n", syscall(SYS_gettid));
467         
468         for (i = 1; i < argc; i++) {
469                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
470                         spec = argv[i+1];
471                         i += 1;
472                         continue;
473                 }
474
475                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
476                         portno = strtoul(argv[i+1], NULL, 10);
477                         i += 1;
478                         continue;
479                 }
480
481                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
482                         nr_ops = strtoul(argv[i+1], NULL, 10);
483                         i += 1;
484                         continue;
485                 }
486                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
487                         debug_level = atoi(argv[i+1]);
488                         i += 1;
489                         continue;
490                 }
491                 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
492                         nr_threads = strtoul(argv[i+1], NULL, 10);
493                         i += 1;
494                         continue;
495                 }
496                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
497                         defer_portno = strtoul(argv[i+1], NULL, 10);
498                         i += 1;
499                         continue;
500                 }
501
502         }
503         
504         //TODO perform argument sanity checks
505         verbose = debug_level;
506
507         //TODO err check
508         peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
509         r = custom_peer_init(peer, argc, argv);
510         if (r < 0)
511                 return -1;
512         peerd_start_threads(peer);
513         return peerd_loop(peer);
514 }