fix wrong header file in mt-vlmcd
[archipelago] / xseg / peers / user / speer.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <unistd.h>
6 #include <xseg/xseg.h>
7 #include <speer.h>
8 #include <sys/syscall.h>
9 #include <sys/time.h>
10 #include <signal.h>
11
12
13 unsigned int verbose;
14 struct log_ctx lc;
15
16 inline int canDefer(struct peerd *peer)
17 {
18         return !(peer->defer_portno == NoPort);
19 }
20
21 void print_req(struct xseg *xseg, struct xseg_request *req)
22 {
23         char target[64], data[64];
24         char *req_target, *req_data;
25         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
26         req_target = xseg_get_target(xseg, req);
27         req_data = xseg_get_data(xseg, req);
28
29         if (1) {
30                 strncpy(target, req_target, end);
31                 target[end] = 0;
32                 strncpy(data, req_data, 63);
33                 data[63] = 0;
34                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
35                                 "src: %u, st: %u, dst: %u dt: %u\n"
36                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
37                                 (unsigned long)(req),
38                                 (unsigned int)req->op,
39                                 (unsigned long long)req->offset,
40                                 (unsigned long)req->size,
41                                 (unsigned long)req->serviced,
42                                 (unsigned int)req->state,
43                                 (unsigned int)req->src_portno,
44                                 (unsigned int)req->src_transit_portno,
45                                 (unsigned int)req->dst_portno,
46                                 (unsigned int)req->dst_transit_portno,
47                                 (unsigned int)req->targetlen, target,
48                                 (unsigned long long)req->datalen, data);
49         }
50 }
51
52 void log_pr(char *msg, struct peer_req *pr)
53 {
54         char target[64], data[64];
55         char *req_target, *req_data;
56         struct peerd *peer = pr->peer;
57         struct xseg *xseg = pr->peer->xseg;
58         req_target = xseg_get_target(xseg, pr->req);
59         req_data = xseg_get_data(xseg, pr->req);
60         /* null terminate name in case of req->target is less than 63 characters,
61          * and next character after name (aka first byte of next buffer) is not
62          * null
63          */
64         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
65         if (verbose) {
66                 strncpy(target, req_target, end);
67                 target[end] = 0;
68                 strncpy(data, req_data, 63);
69                 data[63] = 0;
70                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
71                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
72                                 msg,
73                                 (unsigned int)(pr - peer->peer_reqs),
74                                 (unsigned int)pr->req->op,
75                                 (unsigned long long)pr->req->offset,
76                                 (unsigned long)pr->req->size,
77                                 (unsigned long)pr->req->serviced,
78                                 (unsigned long)pr->retval,
79                                 (unsigned int)pr->req->state,
80                                 (unsigned int)pr->req->targetlen, target,
81                                 (unsigned long long)pr->req->datalen, data);
82         }
83 }
84
85 inline struct peer_req *alloc_peer_req(struct peerd *peer)
86 {
87         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
88         if (idx == Noneidx)
89                 return NULL;
90         return peer->peer_reqs + idx;
91 }
92
93 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
94 {
95         xqindex idx = pr - peer->peer_reqs;
96         pr->req = NULL;
97         xq_append_head(&peer->free_reqs, idx, 1);
98 }
99
100 struct timeval resp_start, resp_end, resp_accum = {0, 0};
101 uint64_t responds = 0;
102 void get_responds_stats(){
103                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
104                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
105 }
106
107 //FIXME error check
108 void fail(struct peerd *peer, struct peer_req *pr)
109 {
110         struct xseg_request *req = pr->req;
111         uint32_t p;
112         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
113         req->state |= XS_FAILED;
114         //xseg_set_req_data(peer->xseg, pr->req, NULL);
115         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
116         if (xseg_signal(peer->xseg, p) < 0)
117                 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
118         free_peer_req(peer, pr);
119 }
120
121 //FIXME error check
122 void complete(struct peerd *peer, struct peer_req *pr)
123 {
124         struct xseg_request *req = pr->req;
125         uint32_t p;
126         req->state |= XS_SERVED;
127         //xseg_set_req_data(peer->xseg, pr->req, NULL);
128         //gettimeofday(&resp_start, NULL);
129         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
130         //gettimeofday(&resp_end, NULL);
131         //responds++;
132         //timersub(&resp_end, &resp_start, &resp_end);
133         //timeradd(&resp_end, &resp_accum, &resp_accum);
134         //printf("xseg_signal: %u\n", p);
135         if (xseg_signal(peer->xseg, p) < 0)
136                 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
137         free_peer_req(peer, pr);
138 }
139
140 void pending(struct peerd *peer, struct peer_req *pr)
141 {
142                 pr->req->state = XS_PENDING;
143 }
144
145 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
146                                 struct xseg_request *req)
147 {
148         struct xseg_request *xreq = pr->req;
149         //assert xreq == req;
150         XSEGLOG2(&lc, D, "Handle accepted");
151         xreq->serviced = 0;
152         //xreq->state = XS_ACCEPTED;
153         pr->retval = 0;
154         dispatch(peer, pr, req);
155 }
156
157 static void handle_received(struct peerd *peer, struct peer_req *pr,
158                                 struct xseg_request *req)
159 {
160         //struct xseg_request *req = pr->req;
161         //assert req->state != XS_ACCEPTED;
162         XSEGLOG2(&lc, D, "Handle received \n");
163         dispatch(peer, pr, req);
164
165 }
166
167 struct timeval sub_start, sub_end, sub_accum = {0, 0};
168 uint64_t submits = 0;
169 void get_submits_stats(){
170                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
171                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
172 }
173
174 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
175 {
176         uint32_t ret;
177         struct xseg_request *req = pr->req;
178         // assert req->portno == peer->portno ?
179         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
180         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
181         if (ret < 0)
182                 return -1;
183         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
184         //gettimeofday(&sub_start, NULL);
185         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
186         //gettimeofday(&sub_end, NULL);
187         //submits++;
188         //timersub(&sub_end, &sub_start, &sub_end);
189         //timeradd(&sub_end, &sub_accum, &sub_accum);
190         if (ret == NoPort)
191                 return -1;
192         xseg_signal(peer->xseg, ret);
193         return 0;
194 }
195
196 static int check_ports(struct peerd *peer)
197 {
198         struct xseg *xseg = peer->xseg;
199         xport portno_start = peer->portno_start;
200         xport portno_end = peer->portno_end;
201         struct xseg_request *accepted, *received;
202         struct peer_req *pr;
203         xport i;
204         int  r, c = 0;
205
206         for (i = portno_start; i <= portno_end; i++) {
207                 accepted = NULL;
208                 received = NULL;
209                 pr = alloc_peer_req(peer);
210                 if (pr) {
211                         accepted = xseg_accept(xseg, i, X_NONBLOCK);
212                         if (accepted) {
213                                 pr->req = accepted;
214                                 pr->portno = i;
215                                 xseg_cancel_wait(xseg, i);
216                                 handle_accepted(peer, pr, accepted);
217                                 c = 1;
218                         }
219                         else {
220                                 free_peer_req(peer, pr);
221                         }
222                 }
223                 received = xseg_receive(xseg, i, X_NONBLOCK);
224                 if (received) {
225                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
226                         if (r < 0 || !pr){
227                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
228                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
229                                 if (p == NoPort){
230                                         XSEGLOG2(&lc, W, "Could not respond stale request");
231                                         xseg_put_request(xseg, received, portno_start);
232                                         continue;
233                                 } else {
234                                         xseg_signal(xseg, p);
235                                 }
236                         } else {
237                                 //maybe perform sanity check for pr
238                                 xseg_cancel_wait(xseg, i);
239                                 handle_received(peer, pr, received);
240                                 c = 1;
241                         }
242                 }
243         }
244
245         return c;
246 }
247
248 static int peerd_loop(struct peerd *peer)
249 {
250         struct xseg *xseg = peer->xseg;
251         xport portno_start = peer->portno_start;
252         xport portno_end = peer->portno_end;
253         uint64_t threshold=1000/(portno_end - portno_start);
254         pid_t pid =syscall(SYS_gettid);
255         uint64_t loops;
256         
257         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
258         xseg_init_local_signal(xseg, peer->portno_start);
259         for (;;) {
260                 for(loops= threshold; loops > 0; loops--) {
261                         if (loops == 1)
262                                 xseg_prepare_wait(xseg, peer->portno_start);
263                         if (check_ports(peer))
264                                 loops = threshold;
265                 }
266                 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
267                 xseg_wait_signal(xseg, 10000000UL);
268                 xseg_cancel_wait(xseg, peer->portno_start);
269                 XSEGLOG2(&lc, I, "Peer woke up\n");
270         }
271         xseg_quit_local_signal(xseg, peer->portno_start);
272         return 0;
273 }
274
275 void defer_request(struct peerd *peer, struct peer_req *pr)
276 {
277         // assert canDefer(peer);
278 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
279 //      xseg_signal(peer->xseg, peer->defer_portno);
280 //      free_peer_req(peer, pr);
281 }
282
283 static struct xseg *join(char *spec)
284 {
285         struct xseg_config config;
286         struct xseg *xseg;
287
288         (void)xseg_parse_spec(spec, &config);
289         xseg = xseg_join(config.type, config.name, "posix", NULL);
290         if (xseg)
291                 return xseg;
292
293         (void)xseg_create(&config);
294         return xseg_join(config.type, config.name, "posix", NULL);
295 }
296
297 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
298                         long portno_end, uint32_t defer_portno)
299 {
300         int i;
301         struct peerd *peer;
302         struct xseg_port *port;
303         peer = malloc(sizeof(struct peerd));
304         if (!peer) {
305                 perror("malloc");
306                 return NULL;
307         }
308         peer->nr_ops = nr_ops;
309         peer->defer_portno = defer_portno;
310
311         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
312         if (!peer->peer_reqs){
313 malloc_fail:
314                 perror("malloc");
315                 return NULL;
316         }
317
318         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
319                 goto malloc_fail;
320
321         if (xseg_initialize()){
322                 printf("cannot initialize library\n");
323                 return NULL;
324         }
325         peer->xseg = join(spec);
326         if (!peer->xseg) 
327                 return NULL;
328
329         peer->portno_start = (xport) portno_start;
330         peer->portno_end= (xport) portno_end;
331         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
332         if (!port){
333                 printf("cannot bind to port %ld\n", peer->portno_start);
334                 return NULL;
335         }
336
337         xport p;
338         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
339                 struct xseg_port *tmp;
340                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
341                 if (!tmp){
342                         printf("cannot bind to port %ld\n", p);
343                         return NULL;
344                 }
345         }
346
347         printf("Peer on ports  %u-%u\n", peer->portno_start,
348                         peer->portno_end);
349
350         for (i = 0; i < nr_ops; i++) {
351                 peer->peer_reqs[i].peer = peer;
352                 peer->peer_reqs[i].req = NULL;
353                 peer->peer_reqs[i].retval = 0;
354                 peer->peer_reqs[i].priv = NULL;
355                 peer->peer_reqs[i].portno = NoPort;
356         }
357         return peer;
358 }
359
360
361 int main(int argc, char *argv[])
362 {
363         struct peerd *peer = NULL;
364         //parse args
365         char *spec = "";
366         int i, r;
367         long portno_start = -1, portno_end = -1, portno = -1;;
368         //set defaults here
369         uint32_t nr_ops = 16;
370         unsigned int debug_level = 0;
371         uint32_t defer_portno = NoPort;
372         char *logfile = NULL;
373
374         //capture here -g spec, -n nr_ops, -p portno, -v verbose level
375         // -dp xseg_portno to defer blocking requests
376         // -l log file ?
377         //TODO print messages on arg parsing error
378         
379         for (i = 1; i < argc; i++) {
380                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
381                         spec = argv[i+1];
382                         i += 1;
383                         continue;
384                 }
385
386                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
387                         portno_start = strtoul(argv[i+1], NULL, 10);
388                         i += 1;
389                         continue;
390                 }
391                 
392                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
393                         portno_end = strtoul(argv[i+1], NULL, 10);
394                         i += 1;
395                         continue;
396                 }
397                 
398                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
399                         portno = strtoul(argv[i+1], NULL, 10);
400                         i += 1;
401                         continue;
402                 }
403
404                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
405                         nr_ops = strtoul(argv[i+1], NULL, 10);
406                         i += 1;
407                         continue;
408                 }
409                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
410                         debug_level = atoi(argv[i+1]);
411                         i += 1;
412                         continue;
413                 }
414                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
415                         defer_portno = strtoul(argv[i+1], NULL, 10);
416                         i += 1;
417                         continue;
418                 }
419                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
420                         logfile = argv[i+1];
421                         i += 1;
422                         continue;
423                 }
424
425         }
426         init_logctx(&lc, argv[0], debug_level, logfile);
427         //TODO perform argument sanity checks
428         verbose = debug_level;
429
430         if (portno != -1) {
431                 portno_start = portno;
432                 portno_end = portno;
433         }
434
435         //TODO err check
436         peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno);
437         if (!peer)
438                 return -1;
439         r = custom_peer_init(peer, argc, argv);
440         if (r < 0)
441                 return -1;
442         return peerd_loop(peer);
443 }