2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <sys/types.h>
40 #include <xseg/xseg.h>
42 #include <sys/syscall.h>
54 inline int canDefer(struct peerd *peer)
56 return !(peer->defer_portno == NoPort);
59 void print_req(struct xseg *xseg, struct xseg_request *req)
61 char target[64], data[64];
62 char *req_target, *req_data;
63 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
64 req_target = xseg_get_target(xseg, req);
65 req_data = xseg_get_data(xseg, req);
68 strncpy(target, req_target, end);
70 strncpy(data, req_data, 63);
72 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
73 "src: %u, st: %u, dst: %u dt: %u\n"
74 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
76 (unsigned int)req->op,
77 (unsigned long long)req->offset,
78 (unsigned long)req->size,
79 (unsigned long)req->serviced,
80 (unsigned int)req->state,
81 (unsigned int)req->src_portno,
82 (unsigned int)req->src_transit_portno,
83 (unsigned int)req->dst_portno,
84 (unsigned int)req->dst_transit_portno,
85 (unsigned int)req->targetlen, target,
86 (unsigned long long)req->datalen, data);
90 void log_pr(char *msg, struct peer_req *pr)
92 char target[64], data[64];
93 char *req_target, *req_data;
94 struct peerd *peer = pr->peer;
95 struct xseg *xseg = pr->peer->xseg;
96 req_target = xseg_get_target(xseg, pr->req);
97 req_data = xseg_get_data(xseg, pr->req);
98 /* null terminate name in case of req->target is less than 63 characters,
99 * and next character after name (aka first byte of next buffer) is not
102 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
104 strncpy(target, req_target, end);
106 strncpy(data, req_data, 63);
108 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
109 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
111 (unsigned int)(pr - peer->peer_reqs),
112 (unsigned int)pr->req->op,
113 (unsigned long long)pr->req->offset,
114 (unsigned long)pr->req->size,
115 (unsigned long)pr->req->serviced,
116 (unsigned long)pr->retval,
117 (unsigned int)pr->req->state,
118 (unsigned int)pr->req->targetlen, target,
119 (unsigned long long)pr->req->datalen, data);
123 inline struct peer_req *alloc_peer_req(struct peerd *peer)
125 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
128 return peer->peer_reqs + idx;
131 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
133 xqindex idx = pr - peer->peer_reqs;
135 xq_append_head(&peer->free_reqs, idx, 1);
138 struct timeval resp_start, resp_end, resp_accum = {0, 0};
139 uint64_t responds = 0;
140 void get_responds_stats(){
141 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
142 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
146 void fail(struct peerd *peer, struct peer_req *pr)
148 struct xseg_request *req = pr->req;
150 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
151 req->state |= XS_FAILED;
152 //xseg_set_req_data(peer->xseg, pr->req, NULL);
153 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
154 if (xseg_signal(peer->xseg, p) < 0)
155 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
156 free_peer_req(peer, pr);
160 void complete(struct peerd *peer, struct peer_req *pr)
162 struct xseg_request *req = pr->req;
164 req->state |= XS_SERVED;
165 //xseg_set_req_data(peer->xseg, pr->req, NULL);
166 //gettimeofday(&resp_start, NULL);
167 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
168 //gettimeofday(&resp_end, NULL);
170 //timersub(&resp_end, &resp_start, &resp_end);
171 //timeradd(&resp_end, &resp_accum, &resp_accum);
172 //printf("xseg_signal: %u\n", p);
173 if (xseg_signal(peer->xseg, p) < 0)
174 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
175 free_peer_req(peer, pr);
178 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
179 struct xseg_request *req)
181 struct xseg_request *xreq = pr->req;
182 //assert xreq == req;
183 XSEGLOG2(&lc, D, "Handle accepted");
186 dispatch(peer, pr, req, dispatch_accept);
189 static void handle_received(struct peerd *peer, struct peer_req *pr,
190 struct xseg_request *req)
192 //struct xseg_request *req = pr->req;
193 XSEGLOG2(&lc, D, "Handle received \n");
194 dispatch(peer, pr, req, dispatch_receive);
198 struct timeval sub_start, sub_end, sub_accum = {0, 0};
199 uint64_t submits = 0;
200 void get_submits_stats(){
201 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
202 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
205 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
208 struct xseg_request *req = pr->req;
209 // assert req->portno == peer->portno ?
210 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
211 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
214 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
215 //gettimeofday(&sub_start, NULL);
216 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
217 //gettimeofday(&sub_end, NULL);
219 //timersub(&sub_end, &sub_start, &sub_end);
220 //timeradd(&sub_end, &sub_accum, &sub_accum);
223 xseg_signal(peer->xseg, ret);
227 static int check_ports(struct peerd *peer)
229 struct xseg *xseg = peer->xseg;
230 xport portno_start = peer->portno_start;
231 xport portno_end = peer->portno_end;
232 struct xseg_request *accepted, *received;
237 for (i = portno_start; i <= portno_end; i++) {
240 pr = alloc_peer_req(peer);
242 accepted = xseg_accept(xseg, i, X_NONBLOCK);
246 xseg_cancel_wait(xseg, i);
247 handle_accepted(peer, pr, accepted);
251 free_peer_req(peer, pr);
254 received = xseg_receive(xseg, i, X_NONBLOCK);
256 r = xseg_get_req_data(xseg, received, (void **) &pr);
258 XSEGLOG2(&lc, W, "Received request with no pr data\n");
259 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
261 XSEGLOG2(&lc, W, "Could not respond stale request");
262 xseg_put_request(xseg, received, portno_start);
265 xseg_signal(xseg, p);
268 //maybe perform sanity check for pr
269 xseg_cancel_wait(xseg, i);
270 handle_received(peer, pr, received);
279 static int peerd_loop(struct peerd *peer)
281 struct xseg *xseg = peer->xseg;
282 xport portno_start = peer->portno_start;
283 xport portno_end = peer->portno_end;
284 uint64_t threshold=1000/(1 + portno_end - portno_start);
285 pid_t pid =syscall(SYS_gettid);
288 XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
289 xseg_init_local_signal(xseg, peer->portno_start);
291 for(loops= threshold; loops > 0; loops--) {
293 xseg_prepare_wait(xseg, peer->portno_start);
294 if (check_ports(peer))
302 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
303 xseg_wait_signal(xseg, 10000000UL);
304 xseg_cancel_wait(xseg, peer->portno_start);
305 XSEGLOG2(&lc, I, "Peer woke up\n");
310 xseg_quit_local_signal(xseg, peer->portno_start);
314 void defer_request(struct peerd *peer, struct peer_req *pr)
316 // assert canDefer(peer);
317 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
318 // xseg_signal(peer->xseg, peer->defer_portno);
319 // free_peer_req(peer, pr);
322 static struct xseg *join(char *spec)
324 struct xseg_config config;
327 (void)xseg_parse_spec(spec, &config);
328 xseg = xseg_join(config.type, config.name, "posix", NULL);
332 (void)xseg_create(&config);
333 return xseg_join(config.type, config.name, "posix", NULL);
336 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
337 long portno_end, uint32_t defer_portno)
341 struct xseg_port *port;
346 peer = malloc(sizeof(struct peerd));
351 peer->nr_ops = nr_ops;
352 peer->defer_portno = defer_portno;
354 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
355 if (!peer->peer_reqs){
361 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
364 if (xseg_initialize()){
365 printf("cannot initialize library\n");
368 peer->xseg = join(spec);
372 peer->portno_start = (xport) portno_start;
373 peer->portno_end= (xport) portno_end;
374 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
376 printf("cannot bind to port %ld\n", peer->portno_start);
381 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
382 struct xseg_port *tmp;
383 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
385 printf("cannot bind to port %ld\n", p);
390 printf("Peer on ports %u-%u\n", peer->portno_start,
393 for (i = 0; i < nr_ops; i++) {
394 peer->peer_reqs[i].peer = peer;
395 peer->peer_reqs[i].req = NULL;
396 peer->peer_reqs[i].retval = 0;
397 peer->peer_reqs[i].priv = NULL;
398 peer->peer_reqs[i].portno = NoPort;
400 peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
407 int main(int argc, char *argv[])
409 struct peerd *peer = NULL;
413 long portno_start = -1, portno_end = -1, portno = -1;;
415 uint32_t nr_ops = 16;
416 unsigned int debug_level = 0;
417 uint32_t defer_portno = NoPort;
418 char *logfile = NULL;
420 //capture here -g spec, -n nr_ops, -p portno, -v verbose level
421 // -dp xseg_portno to defer blocking requests
423 //TODO print messages on arg parsing error
425 for (i = 1; i < argc; i++) {
426 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
432 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
433 portno_start = strtoul(argv[i+1], NULL, 10);
438 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
439 portno_end = strtoul(argv[i+1], NULL, 10);
444 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
445 portno = strtoul(argv[i+1], NULL, 10);
450 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
451 nr_ops = strtoul(argv[i+1], NULL, 10);
455 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
456 debug_level = atoi(argv[i+1]);
460 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
461 defer_portno = strtoul(argv[i+1], NULL, 10);
465 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
472 init_logctx(&lc, argv[0], debug_level, logfile);
473 //TODO perform argument sanity checks
474 verbose = debug_level;
477 portno_start = portno;
482 peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno);
485 r = custom_peer_init(peer, argc, argv);
489 st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
490 return st_thread_join(st, NULL);
492 return peerd_loop(peer);