8 #include <sys/syscall.h>
16 inline int canDefer(struct peerd *peer)
18 return !(peer->defer_portno == NoPort);
21 void print_req(struct xseg *xseg, struct xseg_request *req)
23 char target[64], data[64];
24 char *req_target, *req_data;
25 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
26 req_target = xseg_get_target(xseg, req);
27 req_data = xseg_get_data(xseg, req);
30 strncpy(target, req_target, end);
32 strncpy(data, req_data, 63);
34 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
35 "src: %u, st: %u, dst: %u dt: %u\n"
36 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
38 (unsigned int)req->op,
39 (unsigned long long)req->offset,
40 (unsigned long)req->size,
41 (unsigned long)req->serviced,
42 (unsigned int)req->state,
43 (unsigned int)req->src_portno,
44 (unsigned int)req->src_transit_portno,
45 (unsigned int)req->dst_portno,
46 (unsigned int)req->dst_transit_portno,
47 (unsigned int)req->targetlen, target,
48 (unsigned long long)req->datalen, data);
52 void log_pr(char *msg, struct peer_req *pr)
54 char target[64], data[64];
55 char *req_target, *req_data;
56 struct peerd *peer = pr->peer;
57 struct xseg *xseg = pr->peer->xseg;
58 req_target = xseg_get_target(xseg, pr->req);
59 req_data = xseg_get_data(xseg, pr->req);
60 /* null terminate name in case of req->target is less than 63 characters,
61 * and next character after name (aka first byte of next buffer) is not
64 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
66 strncpy(target, req_target, end);
68 strncpy(data, req_data, 63);
70 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
71 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
73 (unsigned int)(pr - peer->peer_reqs),
74 (unsigned int)pr->req->op,
75 (unsigned long long)pr->req->offset,
76 (unsigned long)pr->req->size,
77 (unsigned long)pr->req->serviced,
78 (unsigned long)pr->retval,
79 (unsigned int)pr->req->state,
80 (unsigned int)pr->req->targetlen, target,
81 (unsigned long long)pr->req->datalen, data);
85 inline struct peer_req *alloc_peer_req(struct peerd *peer)
87 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
90 return peer->peer_reqs + idx;
93 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
95 xqindex idx = pr - peer->peer_reqs;
97 xq_append_head(&peer->free_reqs, idx, 1);
100 struct timeval resp_start, resp_end, resp_accum = {0, 0};
101 uint64_t responds = 0;
102 void get_responds_stats(){
103 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
104 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
108 void fail(struct peerd *peer, struct peer_req *pr)
110 struct xseg_request *req = pr->req;
112 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
113 req->state |= XS_FAILED;
114 //xseg_set_req_data(peer->xseg, pr->req, NULL);
115 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
116 if (xseg_signal(peer->xseg, p) < 0)
117 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
118 free_peer_req(peer, pr);
122 void complete(struct peerd *peer, struct peer_req *pr)
124 struct xseg_request *req = pr->req;
126 req->state |= XS_SERVED;
127 //xseg_set_req_data(peer->xseg, pr->req, NULL);
128 //gettimeofday(&resp_start, NULL);
129 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
130 //gettimeofday(&resp_end, NULL);
132 //timersub(&resp_end, &resp_start, &resp_end);
133 //timeradd(&resp_end, &resp_accum, &resp_accum);
134 //printf("xseg_signal: %u\n", p);
135 if (xseg_signal(peer->xseg, p) < 0)
136 XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
137 free_peer_req(peer, pr);
140 void pending(struct peerd *peer, struct peer_req *pr)
142 pr->req->state = XS_PENDING;
145 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
146 struct xseg_request *req)
148 struct xseg_request *xreq = pr->req;
149 //assert xreq == req;
150 XSEGLOG2(&lc, D, "Handle accepted");
152 //xreq->state = XS_ACCEPTED;
154 dispatch(peer, pr, req);
157 static void handle_received(struct peerd *peer, struct peer_req *pr,
158 struct xseg_request *req)
160 //struct xseg_request *req = pr->req;
161 //assert req->state != XS_ACCEPTED;
162 XSEGLOG2(&lc, D, "Handle received \n");
163 dispatch(peer, pr, req);
167 struct timeval sub_start, sub_end, sub_accum = {0, 0};
168 uint64_t submits = 0;
169 void get_submits_stats(){
170 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
171 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
174 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
177 struct xseg_request *req = pr->req;
178 // assert req->portno == peer->portno ?
179 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
180 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
183 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
184 //gettimeofday(&sub_start, NULL);
185 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
186 //gettimeofday(&sub_end, NULL);
188 //timersub(&sub_end, &sub_start, &sub_end);
189 //timeradd(&sub_end, &sub_accum, &sub_accum);
192 xseg_signal(peer->xseg, ret);
196 static int check_ports(struct peerd *peer)
198 struct xseg *xseg = peer->xseg;
199 xport portno_start = peer->portno_start;
200 xport portno_end = peer->portno_end;
201 struct xseg_request *accepted, *received;
206 for (i = portno_start; i <= portno_end; i++) {
209 pr = alloc_peer_req(peer);
211 accepted = xseg_accept(xseg, i, X_NONBLOCK);
215 xseg_cancel_wait(xseg, i);
216 handle_accepted(peer, pr, accepted);
220 free_peer_req(peer, pr);
223 received = xseg_receive(xseg, i, X_NONBLOCK);
225 r = xseg_get_req_data(xseg, received, (void **) &pr);
227 XSEGLOG2(&lc, W, "Received request with no pr data\n");
228 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
230 XSEGLOG2(&lc, W, "Could not respond stale request");
231 xseg_put_request(xseg, received, portno_start);
234 xseg_signal(xseg, p);
237 //maybe perform sanity check for pr
238 xseg_cancel_wait(xseg, i);
239 handle_received(peer, pr, received);
248 static int peerd_loop(struct peerd *peer)
250 struct xseg *xseg = peer->xseg;
251 xport portno_start = peer->portno_start;
252 xport portno_end = peer->portno_end;
253 uint64_t threshold=1000/(portno_end - portno_start);
254 pid_t pid =syscall(SYS_gettid);
257 XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
258 xseg_init_local_signal(xseg, peer->portno_start);
260 for(loops= threshold; loops > 0; loops--) {
262 xseg_prepare_wait(xseg, peer->portno_start);
263 if (check_ports(peer))
266 XSEGLOG2(&lc, I, "Peer goes to sleep\n");
267 xseg_wait_signal(xseg, 10000000UL);
268 xseg_cancel_wait(xseg, peer->portno_start);
269 XSEGLOG2(&lc, I, "Peer woke up\n");
271 xseg_quit_local_signal(xseg, peer->portno_start);
275 void defer_request(struct peerd *peer, struct peer_req *pr)
277 // assert canDefer(peer);
278 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
279 // xseg_signal(peer->xseg, peer->defer_portno);
280 // free_peer_req(peer, pr);
283 static struct xseg *join(char *spec)
285 struct xseg_config config;
288 (void)xseg_parse_spec(spec, &config);
289 xseg = xseg_join(config.type, config.name, "posix", NULL);
293 (void)xseg_create(&config);
294 return xseg_join(config.type, config.name, "posix", NULL);
297 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
298 long portno_end, uint32_t defer_portno)
302 struct xseg_port *port;
303 peer = malloc(sizeof(struct peerd));
308 peer->nr_ops = nr_ops;
309 peer->defer_portno = defer_portno;
311 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
312 if (!peer->peer_reqs){
318 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
321 if (xseg_initialize()){
322 printf("cannot initialize library\n");
325 peer->xseg = join(spec);
329 peer->portno_start = (xport) portno_start;
330 peer->portno_end= (xport) portno_end;
331 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
333 printf("cannot bind to port %ld\n", peer->portno_start);
338 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
339 struct xseg_port *tmp;
340 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
342 printf("cannot bind to port %ld\n", p);
347 printf("Peer on ports %u-%u\n", peer->portno_start,
350 for (i = 0; i < nr_ops; i++) {
351 peer->peer_reqs[i].peer = peer;
352 peer->peer_reqs[i].req = NULL;
353 peer->peer_reqs[i].retval = 0;
354 peer->peer_reqs[i].priv = NULL;
355 peer->peer_reqs[i].portno = NoPort;
361 int main(int argc, char *argv[])
363 struct peerd *peer = NULL;
367 long portno_start = -1, portno_end = -1, portno = -1;;
369 uint32_t nr_ops = 16;
370 unsigned int debug_level = 0;
371 uint32_t defer_portno = NoPort;
372 char *logfile = NULL;
374 //capture here -g spec, -n nr_ops, -p portno, -v verbose level
375 // -dp xseg_portno to defer blocking requests
377 //TODO print messages on arg parsing error
379 for (i = 1; i < argc; i++) {
380 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
386 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
387 portno_start = strtoul(argv[i+1], NULL, 10);
392 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
393 portno_end = strtoul(argv[i+1], NULL, 10);
398 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
399 portno = strtoul(argv[i+1], NULL, 10);
404 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
405 nr_ops = strtoul(argv[i+1], NULL, 10);
409 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
410 debug_level = atoi(argv[i+1]);
414 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
415 defer_portno = strtoul(argv[i+1], NULL, 10);
419 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
426 init_logctx(&lc, argv[0], debug_level, logfile);
427 //TODO perform argument sanity checks
428 verbose = debug_level;
431 portno_start = portno;
436 peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno);
439 r = custom_peer_init(peer, argc, argv);
442 return peerd_loop(peer);