9 #include <sys/syscall.h>
13 #define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
14 #define LOG(level, ...) \
16 if (level <= verbose) { \
17 fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
22 unsigned int verbose = 0;
29 void (*func)(void *arg);
34 inline int canDefer(struct peerd *peer)
36 return !(peer->defer_portno == NoPort);
39 void print_req(struct xseg *xseg, struct xseg_request *req)
41 char target[64], data[64];
42 char *req_target, *req_data;
43 unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
44 req_target = xseg_get_target(xseg, req);
45 req_data = xseg_get_data(xseg, req);
48 strncpy(target, req_target, end);
50 strncpy(data, req_data, 63);
52 printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
53 "src: %u, st: %u, dst: %u dt: %u\n"
54 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
56 (unsigned int)req->op,
57 (unsigned long long)req->offset,
58 (unsigned long)req->size,
59 (unsigned long)req->serviced,
60 (unsigned int)req->state,
61 (unsigned int)req->src_portno,
62 (unsigned int)req->src_transit_portno,
63 (unsigned int)req->dst_portno,
64 (unsigned int)req->dst_transit_portno,
65 (unsigned int)req->targetlen, target,
66 (unsigned long long)req->datalen, data);
69 void log_pr(char *msg, struct peer_req *pr)
71 char target[64], data[64];
72 char *req_target, *req_data;
73 struct peerd *peer = pr->peer;
74 struct xseg *xseg = pr->peer->xseg;
75 req_target = xseg_get_target(xseg, pr->req);
76 req_data = xseg_get_data(xseg, pr->req);
77 /* null terminate name in case of req->target is less than 63 characters,
78 * * and next character after name (aka first byte of next buffer) is not
81 unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
83 strncpy(target, req_target, end);
85 strncpy(data, req_data, 63);
87 printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
88 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
90 (unsigned int)(pr - peer->peer_reqs),
91 (unsigned int)pr->req->op,
92 (unsigned long long)pr->req->offset,
93 (unsigned long)pr->req->size,
94 (unsigned long)pr->req->serviced,
95 (unsigned long)pr->retval,
96 (unsigned int)pr->req->state,
97 (unsigned int)pr->req->targetlen, target,
98 (unsigned long long)pr->req->datalen, data);
102 inline struct peer_req *alloc_peer_req(struct peerd *peer)
104 xqindex idx = xq_pop_head(&peer->free_reqs, 1);
107 return peer->peer_reqs + idx;
110 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
112 xqindex idx = pr - peer->peer_reqs;
114 xq_append_head(&peer->free_reqs, idx, 1);
117 inline static struct thread* alloc_thread(struct peerd *peer)
119 xqindex idx = xq_pop_head(&peer->threads, 1);
122 return peer->thread + idx;
125 inline static void free_thread(struct peerd *peer, struct thread *t)
127 xqindex idx = t - peer->thread;
128 xq_append_head(&peer->threads, idx, 1);
132 inline static void __wake_up_thread(struct thread *t)
134 pthread_mutex_lock(&t->lock);
135 pthread_cond_signal(&t->cond);
136 pthread_mutex_unlock(&t->lock);
139 inline static void wake_up_thread(struct thread* t)
146 inline static int wake_up_next_thread(struct peerd *peer)
148 //struct thread *t = alloc_thread(peer);
151 return (xseg_signal(peer->xseg, peer->portno));
154 struct timeval resp_start, resp_end, resp_accum = {0, 0};
155 uint64_t responds = 0;
156 void get_responds_stats(){
157 printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
158 //(unsigned int)(t - peer->thread),
159 resp_accum.tv_sec, resp_accum.tv_usec, responds);
163 void fail(struct peerd *peer, struct peer_req *pr)
165 struct xseg_request *req = pr->req;
167 LOG(5, "failing req %u\n", (unsigned int) (pr - peer->peer_reqs));
168 req->state |= XS_FAILED;
169 //xseg_set_req_data(peer->xseg, pr->req, NULL);
170 p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
171 xseg_signal(peer->xseg, p);
172 free_peer_req(peer, pr);
173 wake_up_next_thread(peer);
177 void complete(struct peerd *peer, struct peer_req *pr)
179 struct xseg_request *req = pr->req;
181 req->state |= XS_SERVED;
182 //xseg_set_req_data(peer->xseg, pr->req, NULL);
183 //gettimeofday(&resp_start, NULL);
184 p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
185 //gettimeofday(&resp_end, NULL);
187 //timersub(&resp_end, &resp_start, &resp_end);
188 //timeradd(&resp_end, &resp_accum, &resp_accum);
189 //printf("xseg_signal: %u\n", p);
190 xseg_signal(peer->xseg, p);
191 free_peer_req(peer, pr);
192 wake_up_next_thread(peer);
195 void pending(struct peerd *peer, struct peer_req *pr)
197 pr->req->state = XS_PENDING;
200 static void handle_accepted(struct peerd *peer, struct peer_req *pr,
201 struct xseg_request *req)
203 struct xseg_request *xreq = pr->req;
204 //assert xreq == req;
205 LOG(4, "Handle accepted \n");
207 //xreq->state = XS_ACCEPTED;
209 dispatch(peer, pr, req);
212 static void handle_received(struct peerd *peer, struct peer_req *pr,
213 struct xseg_request *req)
215 //struct xseg_request *req = pr->req;
216 //assert req->state != XS_ACCEPTED;
217 LOG(4, "Handle received \n");
218 dispatch(peer, pr, req);
221 struct timeval sub_start, sub_end, sub_accum = {0, 0};
222 uint64_t submits = 0;
223 void get_submits_stats(){
224 printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
225 //(unsigned int)(t - peer->thread),
226 sub_accum.tv_sec, sub_accum.tv_usec, submits);
229 int submit_peer_req(struct peerd *peer, struct peer_req *pr)
232 struct xseg_request *req = pr->req;
233 // assert req->portno == peer->portno ?
234 //TODO small function with error checking
235 LOG (5, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
236 ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
239 //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
240 //gettimeofday(&sub_start, NULL);
241 ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
242 //gettimeofday(&sub_end, NULL);
244 //timersub(&sub_end, &sub_start, &sub_end);
245 //timeradd(&sub_end, &sub_accum, &sub_accum);
248 xseg_signal(peer->xseg, ret);
252 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
254 struct thread *t = alloc_thread(peer);
261 // we could hijack a thread
265 static void* thread_loop(void *arg)
267 struct thread *t = (struct thread *) arg;
268 struct peerd *peer = t->peer;
269 struct xseg *xseg = peer->xseg;
270 uint32_t portno = peer->portno;
272 uint64_t threshold=1000;
273 pid_t pid =syscall(SYS_gettid);
275 struct xseg_request *accepted, *received;
278 printf("thread %u\n", (unsigned int) (t- peer->thread));
280 LOG(0, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
281 xseg_init_local_signal(xseg, portno);
284 LOG(5, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
285 xseg_cancel_wait(xseg, portno);
292 for(loops= threshold; loops > 0; loops--) {
296 xseg_prepare_wait(xseg, portno);
298 // if (xq_count(&peer->xport->request_queue)){
299 pr = alloc_peer_req(peer);
301 accepted = xseg_accept(xseg, peer->portno);
302 LOG(5, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
305 xseg_cancel_wait(xseg, portno);
306 wake_up_next_thread(peer);
307 handle_accepted(peer, pr, accepted);
311 free_peer_req(peer, pr);
315 // if (xq_count(&peer->xport->reply_queue)){
316 received = xseg_receive(xseg, peer->portno);
318 //printf("received req id: %u\n", received - xseg->requests);
319 //print_req(peer->xseg, received);
320 r = xseg_get_req_data(xseg, received, (void **) &pr);
322 //FIXME what to do here ?
323 LOG(0, "Received request with no pr data\n");
324 xseg_respond(peer->xseg, received, peer->portno, X_ALLOC);
326 //fail(peer, received);
327 //assert pr->req == received;
328 xseg_cancel_wait(xseg, portno);
329 wake_up_next_thread(peer);
330 handle_received(peer, pr, received);
335 LOG(1, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
336 xseg_wait_signal(xseg, 10000000UL);
337 xseg_cancel_wait(xseg, portno);
338 LOG(1, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
343 void defer_request(struct peerd *peer, struct peer_req *pr)
345 // assert canDefer(peer);
346 // xseg_submit(peer->xseg, peer->defer_portno, pr->req);
347 // xseg_signal(peer->xseg, peer->defer_portno);
348 // free_peer_req(peer, pr);
351 static int peerd_loop(struct peerd *peer)
353 if (peer->interactive_func)
354 peer->interactive_func();
356 pthread_join(peer->thread[0].tid, NULL);
361 static struct xseg *join(char *spec)
363 struct xseg_config config;
366 (void)xseg_parse_spec(spec, &config);
367 xseg = xseg_join(config.type, config.name, "pthread", NULL);
371 (void)xseg_create(&config);
372 return xseg_join(config.type, config.name, "pthread", NULL);
375 int peerd_start_threads(struct peerd *peer)
378 uint32_t nr_threads = peer->nr_threads;
380 for (i = 0; i < nr_threads; i++) {
381 peer->thread[i].peer = peer;
382 pthread_cond_init(&peer->thread[i].cond,NULL);
383 pthread_mutex_init(&peer->thread[i].lock, NULL);
384 pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
385 peer->thread[i].func = NULL;
386 peer->thread[i].arg = NULL;
392 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno)
396 peer = malloc(sizeof(struct peerd));
401 peer->nr_ops = nr_ops;
402 peer->defer_portno = defer_portno;
403 peer->nr_threads = nr_threads;
405 peer->thread = calloc(nr_threads, sizeof(struct thread));
408 peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
409 if (!peer->peer_reqs){
415 if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
417 if (!xq_alloc_empty(&peer->threads, nr_threads))
420 if (xseg_initialize()){
421 printf("cannot initialize library\n");
424 peer->xseg = join(spec);
428 peer->xport = xseg_bind_port(peer->xseg, portno);
430 printf("cannot bind to port %ld\n", portno);
433 printf("%lx\n", (unsigned long) peer->xport);
434 peer->portno = xseg_portno(peer->xseg, peer->xport);
435 printf("Peer on port %u/%u\n", peer->portno,
436 peer->xseg->config.nr_ports);
438 for (i = 0; i < nr_ops; i++) {
439 peer->peer_reqs[i].peer = peer;
440 peer->peer_reqs[i].req = NULL;
441 peer->peer_reqs[i].retval = 0;
442 peer->peer_reqs[i].priv = NULL;
444 peer->interactive_func = NULL;
449 int main(int argc, const char *argv[])
451 struct peerd *peer = NULL;
457 uint32_t nr_ops = 16;
458 uint32_t nr_threads = 16 ;
459 unsigned int debug_level = 0;
460 uint32_t defer_portno = NoPort;
462 //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
463 // -dp xseg_portno to defer blocking requests
464 //maybe -l log file ?
465 //TODO print messages on arg parsing error
466 LOG(5, "Main thread has tid %ld.\n", syscall(SYS_gettid));
468 for (i = 1; i < argc; i++) {
469 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
475 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
476 portno = strtoul(argv[i+1], NULL, 10);
481 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
482 nr_ops = strtoul(argv[i+1], NULL, 10);
486 if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
487 debug_level = atoi(argv[i+1]);
491 if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
492 nr_threads = strtoul(argv[i+1], NULL, 10);
496 if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
497 defer_portno = strtoul(argv[i+1], NULL, 10);
504 //TODO perform argument sanity checks
505 verbose = debug_level;
508 peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
509 r = custom_peer_init(peer, argc, argv);
512 peerd_start_threads(peer);
513 return peerd_loop(peer);