9 #include <sys/syscall.h>
13 unsigned int verbose = 0;
21 void (*func)(void *arg);
26 inline int canDefer(struct peerd *peer)
28 return !(peer->defer_portno == NoPort);
31 void print_req(struct xseg *xseg, struct xseg_request *req)
33 char target[64], data[64];
34 char *req_target, *req_data;
35 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
36 req_target = xseg_get_target(xseg, req);
37 req_data = xseg_get_data(xseg, req);
40 strncpy(target, req_target, end);
42 strncpy(data, req_data, 63);
44 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
45 "src: %u, st: %u, dst: %u dt: %u\n"
46 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
48 (unsigned int)req->op,
49 (unsigned long long)req->offset,
50 (unsigned long)req->size,
51 (unsigned long)req->serviced,
52 (unsigned int)req->state,
53 (unsigned int)req->src_portno,
54 (unsigned int)req->src_transit_portno,
55 (unsigned int)req->dst_portno,
56 (unsigned int)req->dst_transit_portno,
57 (unsigned int)req->targetlen, target,
58 (unsigned long long)req->datalen, data);
61 void log_pr(char *msg, struct peer_req *pr)
63 char target[64], data[64];
64 char *req_target, *req_data;
65 struct peerd *peer = pr->peer;
66 struct xseg *xseg = pr->peer->xseg;
67 req_target = xseg_get_target(xseg, pr->req);
68 req_data = xseg_get_data(xseg, pr->req);
69 /* null terminate name in case of req->target is less than 63 characters,
70 * and next character after name (aka first byte of next buffer) is not
73 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
75 strncpy(target, req_target, end);
77 strncpy(data, req_data, 63);
79 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
80 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
82 (unsigned int)(pr - peer->peer_reqs),
83 (unsigned int)pr->req->op,
84 (unsigned long long)pr->req->offset,
85 (unsigned long)pr->req->size,
86 (unsigned long)pr->req->serviced,
87 (unsigned long)pr->retval,
88 (unsigned int)pr->req->state,
89 (unsigned int)pr->req->targetlen, target,
90 (unsigned long long)pr->req->datalen, data);
94 inline struct peer_req *alloc_peer_req(struct peerd *peer)
96 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
99 return peer->peer_reqs + idx;
102 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
104 xqindex idx = pr - peer->peer_reqs;
106 xq_append_head(&peer->free_reqs, idx, 1);
109 inline static struct thread* alloc_thread(struct peerd *peer)
111 xqindex idx = xq_pop_head(&peer->threads, 1);
114 return peer->thread + idx;
117 inline static void free_thread(struct peerd *peer, struct thread *t)
119 xqindex idx = t - peer->thread;
120 xq_append_head(&peer->threads, idx, 1);
124 inline static void __wake_up_thread(struct thread *t)
126 pthread_mutex_lock(&t->lock);
127 pthread_cond_signal(&t->cond);
128 pthread_mutex_unlock(&t->lock);
131 inline static void wake_up_thread(struct thread* t)
138 inline static int wake_up_next_thread(struct peerd *peer)
140 //struct thread *t = alloc_thread(peer);
143 return (xseg_signal(peer->xseg, peer->portno_start));
146 struct timeval resp_start, resp_end, resp_accum = {0, 0};
147 uint64_t responds = 0;
148 void get_responds_stats(){
149 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
150 //(unsigned int)(t - peer->thread),
151 resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
155 void fail(struct peerd *peer, struct peer_req *pr)
157 struct xseg_request *req = pr->req;
159 XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
160 req->state |= XS_FAILED;
161 //xseg_set_req_data(peer->xseg, pr->req, NULL);
162 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
163 xseg_signal(peer->xseg, p);
164 free_peer_req(peer, pr);
165 wake_up_next_thread(peer);
169 void complete(struct peerd *peer, struct peer_req *pr)
171 struct xseg_request *req = pr->req;
173 req->state |= XS_SERVED;
174 //xseg_set_req_data(peer->xseg, pr->req, NULL);
175 //gettimeofday(&resp_start, NULL);
176 p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
177 //gettimeofday(&resp_end, NULL);
179 //timersub(&resp_end, &resp_start, &resp_end);
180 //timeradd(&resp_end, &resp_accum, &resp_accum);
181 //printf("xseg_signal: %u\n", p);
182 xseg_signal(peer->xseg, p);
183 free_peer_req(peer, pr);
184 wake_up_next_thread(peer);
187 void pending(struct peerd *peer, struct peer_req *pr)
189 pr->req->state = XS_PENDING;
192 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
193 struct xseg_request *req)
195 struct xseg_request *xreq = pr->req;
196 //assert xreq == req;
197 XSEGLOG2(&lc, D, "Handle accepted");
199 //xreq->state = XS_ACCEPTED;
201 dispatch(peer, pr, req, dispatch_accept);
204 static void handle_received(struct peerd *peer, struct peer_req *pr,
205 struct xseg_request *req)
207 //struct xseg_request *req = pr->req;
208 //assert req->state != XS_ACCEPTED;
209 XSEGLOG2(&lc, D, "Handle received \n");
210 dispatch(peer, pr, req, dispatch_receive);
213 struct timeval sub_start, sub_end, sub_accum = {0, 0};
214 uint64_t submits = 0;
215 void get_submits_stats(){
216 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
217 //(unsigned int)(t - peer->thread),
218 sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
221 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
224 struct xseg_request *req = pr->req;
225 // assert req->portno == peer->portno ?
226 //TODO small function with error checking
227 XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
228 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
231 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
232 //gettimeofday(&sub_start, NULL);
233 ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
234 //gettimeofday(&sub_end, NULL);
236 //timersub(&sub_end, &sub_start, &sub_end);
237 //timeradd(&sub_end, &sub_accum, &sub_accum);
240 xseg_signal(peer->xseg, ret);
244 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
246 struct thread *t = alloc_thread(peer);
253 // we could hijack a thread
257 static void* thread_loop(void *arg)
259 struct thread *t = (struct thread *) arg;
260 struct peerd *peer = t->peer;
261 struct xseg *xseg = peer->xseg;
262 xport portno_start = peer->portno_start;
263 xport portno_end = peer->portno_end;
265 uint64_t threshold=1;
266 pid_t pid =syscall(SYS_gettid);
268 struct xseg_request *accepted, *received;
273 XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
275 XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
276 xseg_init_local_signal(xseg, peer->portno_start);
279 XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
280 xseg_cancel_wait(xseg, peer->portno_start);
287 for(loops= threshold; loops > 0; loops--) {
290 xseg_prepare_wait(xseg, peer->portno_start);
292 for (i = portno_start; i <= portno_end; i++) {
295 pr = alloc_peer_req(peer);
297 accepted = xseg_accept(xseg, i, X_NONBLOCK);
299 XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
302 xseg_cancel_wait(xseg, i);
303 wake_up_next_thread(peer);
304 handle_accepted(peer, pr, accepted);
308 free_peer_req(peer, pr);
311 received = xseg_receive(xseg, i, X_NONBLOCK);
313 //printf("received req id: %u\n", received - xseg->requests);
314 //print_req(peer->xseg, received);
315 r = xseg_get_req_data(xseg, received, (void **) &pr);
317 //FIXME what to do here ?
318 XSEGLOG2(&lc, W, "Received request with no pr data\n");
319 xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
320 //FIXME if fails, put req
322 xseg_cancel_wait(xseg, i);
323 wake_up_next_thread(peer);
324 handle_received(peer, pr, received);
332 XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
333 xseg_wait_signal(xseg, 10000000UL);
334 xseg_cancel_wait(xseg, peer->portno_start);
335 XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
340 void defer_request(struct peerd *peer, struct peer_req *pr)
342 // assert canDefer(peer);
343 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
344 // xseg_signal(peer->xseg, peer->defer_portno);
345 // free_peer_req(peer, pr);
348 static int peerd_loop(struct peerd *peer)
350 if (peer->interactive_func)
351 peer->interactive_func();
353 pthread_join(peer->thread[0].tid, NULL);
358 static struct xseg *join(char *spec)
360 struct xseg_config config;
363 (void)xseg_parse_spec(spec, &config);
364 xseg = xseg_join(config.type, config.name, "pthread", NULL);
368 (void)xseg_create(&config);
369 return xseg_join(config.type, config.name, "pthread", NULL);
372 int peerd_start_threads(struct peerd *peer)
375 uint32_t nr_threads = peer->nr_threads;
377 for (i = 0; i < nr_threads; i++) {
378 peer->thread[i].peer = peer;
379 pthread_cond_init(&peer->thread[i].cond,NULL);
380 pthread_mutex_init(&peer->thread[i].lock, NULL);
381 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
382 peer->thread[i].func = NULL;
383 peer->thread[i].arg = NULL;
389 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
390 long portno_end, uint32_t nr_threads, uint32_t defer_portno)
394 struct xseg_port *port;
395 peer = malloc(sizeof(struct peerd));
400 peer->nr_ops = nr_ops;
401 peer->defer_portno = defer_portno;
402 peer->nr_threads = nr_threads;
404 peer->thread = calloc(nr_threads, sizeof(struct thread));
407 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
408 if (!peer->peer_reqs){
414 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
416 if (!xq_alloc_empty(&peer->threads, nr_threads))
419 if (xseg_initialize()){
420 printf("cannot initialize library\n");
423 peer->xseg = join(spec);
427 peer->portno_start = (xport) portno_start;
428 peer->portno_end= (xport) portno_end;
429 port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
431 printf("cannot bind to port %ld\n", peer->portno_start);
436 for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
437 struct xseg_port *tmp;
438 tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
440 printf("cannot bind to port %ld\n", p);
445 printf("Peer on ports %u-%u\n", peer->portno_start,
448 for (i = 0; i < nr_ops; i++) {
449 peer->peer_reqs[i].peer = peer;
450 peer->peer_reqs[i].req = NULL;
451 peer->peer_reqs[i].retval = 0;
452 peer->peer_reqs[i].priv = NULL;
453 peer->peer_reqs[i].portno = NoPort;
455 peer->interactive_func = NULL;
460 int main(int argc, char *argv[])
462 struct peerd *peer = NULL;
466 long portno_start = -1, portno_end = -1, portno = -1;
468 uint32_t nr_ops = 16;
469 uint32_t nr_threads = 16 ;
470 unsigned int debug_level = 0;
471 uint32_t defer_portno = NoPort;
472 char *logfile = NULL;
474 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
475 // -dp xseg_portno to defer blocking requests
477 //TODO print messages on arg parsing error
479 for (i = 1; i < argc; i++) {
480 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
486 if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
487 portno_start = strtoul(argv[i+1], NULL, 10);
492 if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
493 portno_end = strtoul(argv[i+1], NULL, 10);
498 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
499 portno = strtoul(argv[i+1], NULL, 10);
504 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
505 nr_ops = strtoul(argv[i+1], NULL, 10);
509 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
510 debug_level = atoi(argv[i+1]);
514 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
515 nr_threads = strtoul(argv[i+1], NULL, 10);
519 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
520 defer_portno = strtoul(argv[i+1], NULL, 10);
524 if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
531 init_logctx(&lc, argv[0], debug_level, logfile);
532 XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
534 //TODO perform argument sanity checks
535 verbose = debug_level;
537 portno_start = portno;
542 peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
545 r = custom_peer_init(peer, argc, argv);
548 peerd_start_threads(peer);
549 return peerd_loop(peer);