add obj handlers output in xseg reportall
[archipelago] / xseg / peers / user / speer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <speer.h>
8 #include <sys/syscall.h>
9 #include <sys/time.h>
10 #include <signal.h>
11
12 #ifdef ST_THREADS
13 #include <st.h>
14 uint32_t ta = 0;
15 #endif
16
17 unsigned int verbose;
18 struct log_ctx lc;
19
20 inline int canDefer(struct peerd *peer)
21 {
22         return !(peer->defer_portno == NoPort);
23 }
24
25 void print_req(struct xseg *xseg, struct xseg_request *req)
26 {
27         char target[64], data[64];
28         char *req_target, *req_data;
29         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
30         req_target = xseg_get_target(xseg, req);
31         req_data = xseg_get_data(xseg, req);
32
33         if (1) {
34                 strncpy(target, req_target, end);
35                 target[end] = 0;
36                 strncpy(data, req_data, 63);
37                 data[63] = 0;
38                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
39                                 "src: %u, st: %u, dst: %u dt: %u\n"
40                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
41                                 (unsigned long)(req),
42                                 (unsigned int)req->op,
43                                 (unsigned long long)req->offset,
44                                 (unsigned long)req->size,
45                                 (unsigned long)req->serviced,
46                                 (unsigned int)req->state,
47                                 (unsigned int)req->src_portno,
48                                 (unsigned int)req->src_transit_portno,
49                                 (unsigned int)req->dst_portno,
50                                 (unsigned int)req->dst_transit_portno,
51                                 (unsigned int)req->targetlen, target,
52                                 (unsigned long long)req->datalen, data);
53         }
54 }
55
56 void log_pr(char *msg, struct peer_req *pr)
57 {
58         char target[64], data[64];
59         char *req_target, *req_data;
60         struct peerd *peer = pr->peer;
61         struct xseg *xseg = pr->peer->xseg;
62         req_target = xseg_get_target(xseg, pr->req);
63         req_data = xseg_get_data(xseg, pr->req);
64         /* null terminate name in case of req->target is less than 63 characters,
65          * and next character after name (aka first byte of next buffer) is not
66          * null
67          */
68         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
69         if (verbose) {
70                 strncpy(target, req_target, end);
71                 target[end] = 0;
72                 strncpy(data, req_data, 63);
73                 data[63] = 0;
74                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
75                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
76                                 msg,
77                                 (unsigned int)(pr - peer->peer_reqs),
78                                 (unsigned int)pr->req->op,
79                                 (unsigned long long)pr->req->offset,
80                                 (unsigned long)pr->req->size,
81                                 (unsigned long)pr->req->serviced,
82                                 (unsigned long)pr->retval,
83                                 (unsigned int)pr->req->state,
84                                 (unsigned int)pr->req->targetlen, target,
85                                 (unsigned long long)pr->req->datalen, data);
86         }
87 }
88
89 inline struct peer_req *alloc_peer_req(struct peerd *peer)
90 {
91         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
92         if (idx == Noneidx)
93                 return NULL;
94         return peer->peer_reqs + idx;
95 }
96
97 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
98 {
99         xqindex idx = pr - peer->peer_reqs;
100         pr->req = NULL;
101         xq_append_head(&peer->free_reqs, idx, 1);
102 }
103
104 struct timeval resp_start, resp_end, resp_accum = {0, 0};
105 uint64_t responds = 0;
106 void get_responds_stats(){
107                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
108                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
109 }
110
111 //FIXME error check
112 void fail(struct peerd *peer, struct peer_req *pr)
113 {
114         struct xseg_request *req = pr->req;
115         uint32_t p;
116         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
117         req->state |= XS_FAILED;
118         //xseg_set_req_data(peer->xseg, pr->req, NULL);
119         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
120         if (xseg_signal(peer->xseg, p) < 0)
121                 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
122         free_peer_req(peer, pr);
123 }
124
125 //FIXME error check
126 void complete(struct peerd *peer, struct peer_req *pr)
127 {
128         struct xseg_request *req = pr->req;
129         uint32_t p;
130         req->state |= XS_SERVED;
131         //xseg_set_req_data(peer->xseg, pr->req, NULL);
132         //gettimeofday(&resp_start, NULL);
133         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
134         //gettimeofday(&resp_end, NULL);
135         //responds++;
136         //timersub(&resp_end, &resp_start, &resp_end);
137         //timeradd(&resp_end, &resp_accum, &resp_accum);
138         //printf("xseg_signal: %u\n", p);
139         if (xseg_signal(peer->xseg, p) < 0)
140                 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
141         free_peer_req(peer, pr);
142 }
143
144 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
145                                 struct xseg_request *req)
146 {
147         struct xseg_request *xreq = pr->req;
148         //assert xreq == req;
149         XSEGLOG2(&lc, D, "Handle accepted");
150         xreq->serviced = 0;
151         pr->retval = 0;
152         dispatch(peer, pr, req, dispatch_accept);
153 }
154
155 static void handle_received(struct peerd *peer, struct peer_req *pr,
156                                 struct xseg_request *req)
157 {
158         //struct xseg_request *req = pr->req;
159         XSEGLOG2(&lc, D, "Handle received \n");
160         dispatch(peer, pr, req, dispatch_receive);
161
162 }
163
164 struct timeval sub_start, sub_end, sub_accum = {0, 0};
165 uint64_t submits = 0;
166 void get_submits_stats(){
167                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
168                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
169 }
170
171 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
172 {
173         uint32_t ret;
174         struct xseg_request *req = pr->req;
175         // assert req->portno == peer->portno ?
176         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
177         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
178         if (ret < 0)
179                 return -1;
180         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
181         //gettimeofday(&sub_start, NULL);
182         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
183         //gettimeofday(&sub_end, NULL);
184         //submits++;
185         //timersub(&sub_end, &sub_start, &sub_end);
186         //timeradd(&sub_end, &sub_accum, &sub_accum);
187         if (ret == NoPort)
188                 return -1;
189         xseg_signal(peer->xseg, ret);
190         return 0;
191 }
192
193 static int check_ports(struct peerd *peer)
194 {
195         struct xseg *xseg = peer->xseg;
196         xport portno_start = peer->portno_start;
197         xport portno_end = peer->portno_end;
198         struct xseg_request *accepted, *received;
199         struct peer_req *pr;
200         xport i;
201         int  r, c = 0;
202
203         for (i = portno_start; i <= portno_end; i++) {
204                 accepted = NULL;
205                 received = NULL;
206                 pr = alloc_peer_req(peer);
207                 if (pr) {
208                         accepted = xseg_accept(xseg, i, X_NONBLOCK);
209                         if (accepted) {
210                                 pr->req = accepted;
211                                 pr->portno = i;
212                                 xseg_cancel_wait(xseg, i);
213                                 handle_accepted(peer, pr, accepted);
214                                 c = 1;
215                         }
216                         else {
217                                 free_peer_req(peer, pr);
218                         }
219                 }
220                 received = xseg_receive(xseg, i, X_NONBLOCK);
221                 if (received) {
222                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
223                         if (r < 0 || !pr){
224                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
225                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
226                                 if (p == NoPort){
227                                         XSEGLOG2(&lc, W, "Could not respond stale request");
228                                         xseg_put_request(xseg, received, portno_start);
229                                         continue;
230                                 } else {
231                                         xseg_signal(xseg, p);
232                                 }
233                         } else {
234                                 //maybe perform sanity check for pr
235                                 xseg_cancel_wait(xseg, i);
236                                 handle_received(peer, pr, received);
237                                 c = 1;
238                         }
239                 }
240         }
241
242         return c;
243 }
244
245 static int peerd_loop(struct peerd *peer)
246 {
247         struct xseg *xseg = peer->xseg;
248         xport portno_start = peer->portno_start;
249         xport portno_end = peer->portno_end;
250         uint64_t threshold=1000/(1 + portno_end - portno_start);
251         pid_t pid =syscall(SYS_gettid);
252         uint64_t loops;
253         
254         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
255         xseg_init_local_signal(xseg, peer->portno_start);
256         for (;;) {
257                 for(loops= threshold; loops > 0; loops--) {
258                         if (loops == 1)
259                                 xseg_prepare_wait(xseg, peer->portno_start);
260                         if (check_ports(peer))
261                                 loops = threshold;
262                 }
263 #ifdef ST_THREADS
264                 if (ta){
265                         st_sleep(0);
266                 } else {
267 #endif
268                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
269                         xseg_wait_signal(xseg, 10000000UL);
270                         xseg_cancel_wait(xseg, peer->portno_start);
271                         XSEGLOG2(&lc, I, "Peer woke up\n");
272 #ifdef ST_THREADS
273                 }
274 #endif
275         }
276         xseg_quit_local_signal(xseg, peer->portno_start);
277         return 0;
278 }
279
280 void defer_request(struct peerd *peer, struct peer_req *pr)
281 {
282         // assert canDefer(peer);
283 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
284 //      xseg_signal(peer->xseg, peer->defer_portno);
285 //      free_peer_req(peer, pr);
286 }
287
288 static struct xseg *join(char *spec)
289 {
290         struct xseg_config config;
291         struct xseg *xseg;
292
293         (void)xseg_parse_spec(spec, &config);
294         xseg = xseg_join(config.type, config.name, "posix", NULL);
295         if (xseg)
296                 return xseg;
297
298         (void)xseg_create(&config);
299         return xseg_join(config.type, config.name, "posix", NULL);
300 }
301
302 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
303                         long portno_end, uint32_t defer_portno)
304 {
305         int i;
306         struct peerd *peer;
307         struct xseg_port *port;
308
309 #ifdef ST_THREADS
310         st_init();
311 #endif
312         peer = malloc(sizeof(struct peerd));
313         if (!peer) {
314                 perror("malloc");
315                 return NULL;
316         }
317         peer->nr_ops = nr_ops;
318         peer->defer_portno = defer_portno;
319
320         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
321         if (!peer->peer_reqs){
322 malloc_fail:
323                 perror("malloc");
324                 return NULL;
325         }
326
327         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
328                 goto malloc_fail;
329
330         if (xseg_initialize()){
331                 printf("cannot initialize library\n");
332                 return NULL;
333         }
334         peer->xseg = join(spec);
335         if (!peer->xseg) 
336                 return NULL;
337
338         peer->portno_start = (xport) portno_start;
339         peer->portno_end= (xport) portno_end;
340         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
341         if (!port){
342                 printf("cannot bind to port %ld\n", peer->portno_start);
343                 return NULL;
344         }
345
346         xport p;
347         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
348                 struct xseg_port *tmp;
349                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
350                 if (!tmp){
351                         printf("cannot bind to port %ld\n", p);
352                         return NULL;
353                 }
354         }
355
356         printf("Peer on ports  %u-%u\n", peer->portno_start,
357                         peer->portno_end);
358
359         for (i = 0; i < nr_ops; i++) {
360                 peer->peer_reqs[i].peer = peer;
361                 peer->peer_reqs[i].req = NULL;
362                 peer->peer_reqs[i].retval = 0;
363                 peer->peer_reqs[i].priv = NULL;
364                 peer->peer_reqs[i].portno = NoPort;
365 #ifdef ST_THREADS
366                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
367 #endif
368         }
369         return peer;
370 }
371
372
373 int main(int argc, char *argv[])
374 {
375         struct peerd *peer = NULL;
376         //parse args
377         char *spec = "";
378         int i, r;
379         long portno_start = -1, portno_end = -1, portno = -1;;
380         //set defaults here
381         uint32_t nr_ops = 16;
382         unsigned int debug_level = 0;
383         uint32_t defer_portno = NoPort;
384         char *logfile = NULL;
385
386         //capture here -g spec, -n nr_ops, -p portno, -v verbose level
387         // -dp xseg_portno to defer blocking requests
388         // -l log file ?
389         //TODO print messages on arg parsing error
390         
391         for (i = 1; i < argc; i++) {
392                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
393                         spec = argv[i+1];
394                         i += 1;
395                         continue;
396                 }
397
398                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
399                         portno_start = strtoul(argv[i+1], NULL, 10);
400                         i += 1;
401                         continue;
402                 }
403                 
404                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
405                         portno_end = strtoul(argv[i+1], NULL, 10);
406                         i += 1;
407                         continue;
408                 }
409                 
410                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
411                         portno = strtoul(argv[i+1], NULL, 10);
412                         i += 1;
413                         continue;
414                 }
415
416                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
417                         nr_ops = strtoul(argv[i+1], NULL, 10);
418                         i += 1;
419                         continue;
420                 }
421                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
422                         debug_level = atoi(argv[i+1]);
423                         i += 1;
424                         continue;
425                 }
426                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
427                         defer_portno = strtoul(argv[i+1], NULL, 10);
428                         i += 1;
429                         continue;
430                 }
431                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
432                         logfile = argv[i+1];
433                         i += 1;
434                         continue;
435                 }
436
437         }
438         init_logctx(&lc, argv[0], debug_level, logfile);
439         //TODO perform argument sanity checks
440         verbose = debug_level;
441
442         if (portno != -1) {
443                 portno_start = portno;
444                 portno_end = portno;
445         }
446
447         //TODO err check
448         peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno);
449         if (!peer)
450                 return -1;
451         r = custom_peer_init(peer, argc, argv);
452         if (r < 0)
453                 return -1;
454 #ifdef ST_THREADS
455         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
456         return st_thread_join(st, NULL);
457 #else
458         return peerd_loop(peer);
459 #endif
460 }