add license headers
[archipelago] / xseg / peers / user / speer.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <unistd.h>
40 #include <xseg/xseg.h>
41 #include <speer.h>
42 #include <sys/syscall.h>
43 #include <sys/time.h>
44 #include <signal.h>
45
46 #ifdef ST_THREADS
47 #include <st.h>
48 uint32_t ta = 0;
49 #endif
50
51 unsigned int verbose;
52 struct log_ctx lc;
53
54 inline int canDefer(struct peerd *peer)
55 {
56         return !(peer->defer_portno == NoPort);
57 }
58
59 void print_req(struct xseg *xseg, struct xseg_request *req)
60 {
61         char target[64], data[64];
62         char *req_target, *req_data;
63         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
64         req_target = xseg_get_target(xseg, req);
65         req_data = xseg_get_data(xseg, req);
66
67         if (1) {
68                 strncpy(target, req_target, end);
69                 target[end] = 0;
70                 strncpy(data, req_data, 63);
71                 data[63] = 0;
72                 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
73                                 "src: %u, st: %u, dst: %u dt: %u\n"
74                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
75                                 (unsigned long)(req),
76                                 (unsigned int)req->op,
77                                 (unsigned long long)req->offset,
78                                 (unsigned long)req->size,
79                                 (unsigned long)req->serviced,
80                                 (unsigned int)req->state,
81                                 (unsigned int)req->src_portno,
82                                 (unsigned int)req->src_transit_portno,
83                                 (unsigned int)req->dst_portno,
84                                 (unsigned int)req->dst_transit_portno,
85                                 (unsigned int)req->targetlen, target,
86                                 (unsigned long long)req->datalen, data);
87         }
88 }
89
90 void log_pr(char *msg, struct peer_req *pr)
91 {
92         char target[64], data[64];
93         char *req_target, *req_data;
94         struct peerd *peer = pr->peer;
95         struct xseg *xseg = pr->peer->xseg;
96         req_target = xseg_get_target(xseg, pr->req);
97         req_data = xseg_get_data(xseg, pr->req);
98         /* null terminate name in case of req->target is less than 63 characters,
99          * and next character after name (aka first byte of next buffer) is not
100          * null
101          */
102         unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
103         if (verbose) {
104                 strncpy(target, req_target, end);
105                 target[end] = 0;
106                 strncpy(data, req_data, 63);
107                 data[63] = 0;
108                 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
109                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
110                                 msg,
111                                 (unsigned int)(pr - peer->peer_reqs),
112                                 (unsigned int)pr->req->op,
113                                 (unsigned long long)pr->req->offset,
114                                 (unsigned long)pr->req->size,
115                                 (unsigned long)pr->req->serviced,
116                                 (unsigned long)pr->retval,
117                                 (unsigned int)pr->req->state,
118                                 (unsigned int)pr->req->targetlen, target,
119                                 (unsigned long long)pr->req->datalen, data);
120         }
121 }
122
123 inline struct peer_req *alloc_peer_req(struct peerd *peer)
124 {
125         xqindex idx = xq_pop_head(&peer->free_reqs, 1);
126         if (idx == Noneidx)
127                 return NULL;
128         return peer->peer_reqs + idx;
129 }
130
131 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
132 {
133         xqindex idx = pr - peer->peer_reqs;
134         pr->req = NULL;
135         xq_append_head(&peer->free_reqs, idx, 1);
136 }
137
138 struct timeval resp_start, resp_end, resp_accum = {0, 0};
139 uint64_t responds = 0;
140 void get_responds_stats(){
141                 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
142                                 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
143 }
144
145 //FIXME error check
146 void fail(struct peerd *peer, struct peer_req *pr)
147 {
148         struct xseg_request *req = pr->req;
149         uint32_t p;
150         XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
151         req->state |= XS_FAILED;
152         //xseg_set_req_data(peer->xseg, pr->req, NULL);
153         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
154         if (xseg_signal(peer->xseg, p) < 0)
155                 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
156         free_peer_req(peer, pr);
157 }
158
159 //FIXME error check
160 void complete(struct peerd *peer, struct peer_req *pr)
161 {
162         struct xseg_request *req = pr->req;
163         uint32_t p;
164         req->state |= XS_SERVED;
165         //xseg_set_req_data(peer->xseg, pr->req, NULL);
166         //gettimeofday(&resp_start, NULL);
167         p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
168         //gettimeofday(&resp_end, NULL);
169         //responds++;
170         //timersub(&resp_end, &resp_start, &resp_end);
171         //timeradd(&resp_end, &resp_accum, &resp_accum);
172         //printf("xseg_signal: %u\n", p);
173         if (xseg_signal(peer->xseg, p) < 0)
174                 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
175         free_peer_req(peer, pr);
176 }
177
178 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
179                                 struct xseg_request *req)
180 {
181         struct xseg_request *xreq = pr->req;
182         //assert xreq == req;
183         XSEGLOG2(&lc, D, "Handle accepted");
184         xreq->serviced = 0;
185         pr->retval = 0;
186         dispatch(peer, pr, req, dispatch_accept);
187 }
188
189 static void handle_received(struct peerd *peer, struct peer_req *pr,
190                                 struct xseg_request *req)
191 {
192         //struct xseg_request *req = pr->req;
193         XSEGLOG2(&lc, D, "Handle received \n");
194         dispatch(peer, pr, req, dispatch_receive);
195
196 }
197
198 struct timeval sub_start, sub_end, sub_accum = {0, 0};
199 uint64_t submits = 0;
200 void get_submits_stats(){
201                 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
202                                 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
203 }
204
205 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
206 {
207         uint32_t ret;
208         struct xseg_request *req = pr->req;
209         // assert req->portno == peer->portno ?
210         XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
211         ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
212         if (ret < 0)
213                 return -1;
214         //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
215         //gettimeofday(&sub_start, NULL);
216         ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
217         //gettimeofday(&sub_end, NULL);
218         //submits++;
219         //timersub(&sub_end, &sub_start, &sub_end);
220         //timeradd(&sub_end, &sub_accum, &sub_accum);
221         if (ret == NoPort)
222                 return -1;
223         xseg_signal(peer->xseg, ret);
224         return 0;
225 }
226
227 static int check_ports(struct peerd *peer)
228 {
229         struct xseg *xseg = peer->xseg;
230         xport portno_start = peer->portno_start;
231         xport portno_end = peer->portno_end;
232         struct xseg_request *accepted, *received;
233         struct peer_req *pr;
234         xport i;
235         int  r, c = 0;
236
237         for (i = portno_start; i <= portno_end; i++) {
238                 accepted = NULL;
239                 received = NULL;
240                 pr = alloc_peer_req(peer);
241                 if (pr) {
242                         accepted = xseg_accept(xseg, i, X_NONBLOCK);
243                         if (accepted) {
244                                 pr->req = accepted;
245                                 pr->portno = i;
246                                 xseg_cancel_wait(xseg, i);
247                                 handle_accepted(peer, pr, accepted);
248                                 c = 1;
249                         }
250                         else {
251                                 free_peer_req(peer, pr);
252                         }
253                 }
254                 received = xseg_receive(xseg, i, X_NONBLOCK);
255                 if (received) {
256                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
257                         if (r < 0 || !pr){
258                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
259                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
260                                 if (p == NoPort){
261                                         XSEGLOG2(&lc, W, "Could not respond stale request");
262                                         xseg_put_request(xseg, received, portno_start);
263                                         continue;
264                                 } else {
265                                         xseg_signal(xseg, p);
266                                 }
267                         } else {
268                                 //maybe perform sanity check for pr
269                                 xseg_cancel_wait(xseg, i);
270                                 handle_received(peer, pr, received);
271                                 c = 1;
272                         }
273                 }
274         }
275
276         return c;
277 }
278
279 static int peerd_loop(struct peerd *peer)
280 {
281         struct xseg *xseg = peer->xseg;
282         xport portno_start = peer->portno_start;
283         xport portno_end = peer->portno_end;
284         uint64_t threshold=1000/(1 + portno_end - portno_start);
285         pid_t pid =syscall(SYS_gettid);
286         uint64_t loops;
287         
288         XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
289         xseg_init_local_signal(xseg, peer->portno_start);
290         for (;;) {
291                 for(loops= threshold; loops > 0; loops--) {
292                         if (loops == 1)
293                                 xseg_prepare_wait(xseg, peer->portno_start);
294                         if (check_ports(peer))
295                                 loops = threshold;
296                 }
297 #ifdef ST_THREADS
298                 if (ta){
299                         st_sleep(0);
300                 } else {
301 #endif
302                         XSEGLOG2(&lc, I, "Peer goes to sleep\n");
303                         xseg_wait_signal(xseg, 10000000UL);
304                         xseg_cancel_wait(xseg, peer->portno_start);
305                         XSEGLOG2(&lc, I, "Peer woke up\n");
306 #ifdef ST_THREADS
307                 }
308 #endif
309         }
310         xseg_quit_local_signal(xseg, peer->portno_start);
311         return 0;
312 }
313
314 void defer_request(struct peerd *peer, struct peer_req *pr)
315 {
316         // assert canDefer(peer);
317 //      xseg_submit(peer->xseg, peer->defer_portno, pr->req);
318 //      xseg_signal(peer->xseg, peer->defer_portno);
319 //      free_peer_req(peer, pr);
320 }
321
322 static struct xseg *join(char *spec)
323 {
324         struct xseg_config config;
325         struct xseg *xseg;
326
327         (void)xseg_parse_spec(spec, &config);
328         xseg = xseg_join(config.type, config.name, "posix", NULL);
329         if (xseg)
330                 return xseg;
331
332         (void)xseg_create(&config);
333         return xseg_join(config.type, config.name, "posix", NULL);
334 }
335
336 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
337                         long portno_end, uint32_t defer_portno)
338 {
339         int i;
340         struct peerd *peer;
341         struct xseg_port *port;
342
343 #ifdef ST_THREADS
344         st_init();
345 #endif
346         peer = malloc(sizeof(struct peerd));
347         if (!peer) {
348                 perror("malloc");
349                 return NULL;
350         }
351         peer->nr_ops = nr_ops;
352         peer->defer_portno = defer_portno;
353
354         peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
355         if (!peer->peer_reqs){
356 malloc_fail:
357                 perror("malloc");
358                 return NULL;
359         }
360
361         if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
362                 goto malloc_fail;
363
364         if (xseg_initialize()){
365                 printf("cannot initialize library\n");
366                 return NULL;
367         }
368         peer->xseg = join(spec);
369         if (!peer->xseg) 
370                 return NULL;
371
372         peer->portno_start = (xport) portno_start;
373         peer->portno_end= (xport) portno_end;
374         port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
375         if (!port){
376                 printf("cannot bind to port %ld\n", peer->portno_start);
377                 return NULL;
378         }
379
380         xport p;
381         for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
382                 struct xseg_port *tmp;
383                 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
384                 if (!tmp){
385                         printf("cannot bind to port %ld\n", p);
386                         return NULL;
387                 }
388         }
389
390         printf("Peer on ports  %u-%u\n", peer->portno_start,
391                         peer->portno_end);
392
393         for (i = 0; i < nr_ops; i++) {
394                 peer->peer_reqs[i].peer = peer;
395                 peer->peer_reqs[i].req = NULL;
396                 peer->peer_reqs[i].retval = 0;
397                 peer->peer_reqs[i].priv = NULL;
398                 peer->peer_reqs[i].portno = NoPort;
399 #ifdef ST_THREADS
400                 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
401 #endif
402         }
403         return peer;
404 }
405
406
407 int main(int argc, char *argv[])
408 {
409         struct peerd *peer = NULL;
410         //parse args
411         char *spec = "";
412         int i, r;
413         long portno_start = -1, portno_end = -1, portno = -1;;
414         //set defaults here
415         uint32_t nr_ops = 16;
416         unsigned int debug_level = 0;
417         uint32_t defer_portno = NoPort;
418         char *logfile = NULL;
419
420         //capture here -g spec, -n nr_ops, -p portno, -v verbose level
421         // -dp xseg_portno to defer blocking requests
422         // -l log file ?
423         //TODO print messages on arg parsing error
424         
425         for (i = 1; i < argc; i++) {
426                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
427                         spec = argv[i+1];
428                         i += 1;
429                         continue;
430                 }
431
432                 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
433                         portno_start = strtoul(argv[i+1], NULL, 10);
434                         i += 1;
435                         continue;
436                 }
437                 
438                 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
439                         portno_end = strtoul(argv[i+1], NULL, 10);
440                         i += 1;
441                         continue;
442                 }
443                 
444                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
445                         portno = strtoul(argv[i+1], NULL, 10);
446                         i += 1;
447                         continue;
448                 }
449
450                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
451                         nr_ops = strtoul(argv[i+1], NULL, 10);
452                         i += 1;
453                         continue;
454                 }
455                 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
456                         debug_level = atoi(argv[i+1]);
457                         i += 1;
458                         continue;
459                 }
460                 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
461                         defer_portno = strtoul(argv[i+1], NULL, 10);
462                         i += 1;
463                         continue;
464                 }
465                 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
466                         logfile = argv[i+1];
467                         i += 1;
468                         continue;
469                 }
470
471         }
472         init_logctx(&lc, argv[0], debug_level, logfile);
473         //TODO perform argument sanity checks
474         verbose = debug_level;
475
476         if (portno != -1) {
477                 portno_start = portno;
478                 portno_end = portno;
479         }
480
481         //TODO err check
482         peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno);
483         if (!peer)
484                 return -1;
485         r = custom_peer_init(peer, argc, argv);
486         if (r < 0)
487                 return -1;
488 #ifdef ST_THREADS
489         st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
490         return st_thread_join(st, NULL);
491 #else
492         return peerd_loop(peer);
493 #endif
494 }