root / xseg / peers / user / peer.c @ cb5cf301
History | View | Annotate | Download (22.9 kB)
1 |
/*
|
---|---|
2 |
* Copyright 2012 GRNET S.A. All rights reserved.
|
3 |
*
|
4 |
* Redistribution and use in source and binary forms, with or
|
5 |
* without modification, are permitted provided that the following
|
6 |
* conditions are met:
|
7 |
*
|
8 |
* 1. Redistributions of source code must retain the above
|
9 |
* copyright notice, this list of conditions and the following
|
10 |
* disclaimer.
|
11 |
* 2. Redistributions in binary form must reproduce the above
|
12 |
* copyright notice, this list of conditions and the following
|
13 |
* disclaimer in the documentation and/or other materials
|
14 |
* provided with the distribution.
|
15 |
*
|
16 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 |
* POSSIBILITY OF SUCH DAMAGE.
|
28 |
*
|
29 |
* The views and conclusions contained in the software and
|
30 |
* documentation are those of the authors and should not be
|
31 |
* interpreted as representing official policies, either expressed
|
32 |
* or implied, of GRNET S.A.
|
33 |
*/
|
34 |
|
35 |
#define _GNU_SOURCE
|
36 |
#include <stdio.h> |
37 |
#include <stdlib.h> |
38 |
#include <sys/types.h> |
39 |
#include <unistd.h> |
40 |
#include <sys/syscall.h> |
41 |
#include <sys/time.h> |
42 |
#include <signal.h> |
43 |
#include <sys/stat.h> |
44 |
#include <fcntl.h> |
45 |
#include <errno.h> |
46 |
#ifdef MT
|
47 |
#include <pthread.h> |
48 |
#endif
|
49 |
|
50 |
#include <xseg/xseg.h> |
51 |
#include <peer.h> |
52 |
|
53 |
#ifdef MT
|
54 |
#ifdef ST_THREADS
|
55 |
#error "MT and ST_THREADS defines are mutually exclusive" |
56 |
#endif
|
57 |
#endif
|
58 |
|
59 |
#ifdef MT
|
60 |
#define PEER_TYPE "pthread" |
61 |
#else
|
62 |
#define PEER_TYPE "posix" |
63 |
#endif
|
64 |
|
65 |
//FIXME this should not be defined here probably
|
66 |
#define MAX_SPEC_LEN 128 |
67 |
#define MAX_PIDFILE_LEN 512 |
68 |
|
69 |
volatile unsigned int terminated = 0; |
70 |
unsigned int verbose = 0; |
71 |
struct log_ctx lc;
|
72 |
#ifdef ST_THREADS
|
73 |
uint32_t ta = 0;
|
74 |
#endif
|
75 |
|
76 |
#ifdef MT
|
77 |
struct peerd *global_peer;
|
78 |
static pthread_key_t threadkey;
|
79 |
|
80 |
inline static int wake_up_next_thread(struct peerd *peer) |
81 |
{ |
82 |
return (xseg_signal(peer->xseg, peer->portno_start));
|
83 |
} |
84 |
#endif
|
85 |
|
86 |
/*
|
87 |
* extern is needed if this function is going to be called by another file
|
88 |
* such as bench-xseg.c
|
89 |
*/
|
90 |
|
91 |
void signal_handler(int signal) |
92 |
{ |
93 |
XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
|
94 |
terminated = 1;
|
95 |
#ifdef MT
|
96 |
wake_up_next_thread(global_peer); |
97 |
#endif
|
98 |
} |
99 |
|
100 |
void renew_logfile(int signal) |
101 |
{ |
102 |
XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
|
103 |
renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE); |
104 |
} |
105 |
|
106 |
static int setup_signals(struct peerd *peer) |
107 |
{ |
108 |
int r;
|
109 |
struct sigaction sa;
|
110 |
#ifdef MT
|
111 |
global_peer = peer; |
112 |
#endif
|
113 |
sigemptyset(&sa.sa_mask); |
114 |
sa.sa_flags = 0;
|
115 |
sa.sa_handler = signal_handler; |
116 |
r = sigaction(SIGTERM, &sa, NULL);
|
117 |
if (r < 0) |
118 |
return r;
|
119 |
r = sigaction(SIGINT, &sa, NULL);
|
120 |
if (r < 0) |
121 |
return r;
|
122 |
r = sigaction(SIGQUIT, &sa, NULL);
|
123 |
if (r < 0) |
124 |
return r;
|
125 |
|
126 |
sa.sa_handler = renew_logfile; |
127 |
r = sigaction(SIGUSR1, &sa, NULL);
|
128 |
if (r < 0) |
129 |
return r;
|
130 |
|
131 |
return r;
|
132 |
} |
133 |
|
134 |
inline int canDefer(struct peerd *peer) |
135 |
{ |
136 |
return !(peer->defer_portno == NoPort);
|
137 |
} |
138 |
|
139 |
void print_req(struct xseg *xseg, struct xseg_request *req) |
140 |
{ |
141 |
char target[64], data[64]; |
142 |
char *req_target, *req_data;
|
143 |
unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen; |
144 |
req_target = xseg_get_target(xseg, req); |
145 |
req_data = xseg_get_data(xseg, req); |
146 |
|
147 |
if (1) { |
148 |
strncpy(target, req_target, end); |
149 |
target[end] = 0;
|
150 |
strncpy(data, req_data, 63);
|
151 |
data[63] = 0; |
152 |
printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
|
153 |
"src: %u, transit: %u, dst: %u effective dst: %u\n"
|
154 |
"target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
|
155 |
(unsigned long)(req), |
156 |
(unsigned int)req->op, |
157 |
(unsigned long long)req->offset, |
158 |
(unsigned long)req->size, |
159 |
(unsigned long)req->serviced, |
160 |
(unsigned int)req->state, |
161 |
(unsigned int)req->src_portno, |
162 |
(unsigned int)req->transit_portno, |
163 |
(unsigned int)req->dst_portno, |
164 |
(unsigned int)req->effective_dst_portno, |
165 |
(unsigned int)req->targetlen, target, |
166 |
(unsigned long long)req->datalen, data); |
167 |
} |
168 |
} |
169 |
void log_pr(char *msg, struct peer_req *pr) |
170 |
{ |
171 |
char target[64], data[64]; |
172 |
char *req_target, *req_data;
|
173 |
struct peerd *peer = pr->peer;
|
174 |
struct xseg *xseg = pr->peer->xseg;
|
175 |
req_target = xseg_get_target(xseg, pr->req); |
176 |
req_data = xseg_get_data(xseg, pr->req); |
177 |
/* null terminate name in case of req->target is less than 63 characters,
|
178 |
* and next character after name (aka first byte of next buffer) is not
|
179 |
* null
|
180 |
*/
|
181 |
unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen; |
182 |
if (verbose) {
|
183 |
strncpy(target, req_target, end); |
184 |
target[end] = 0;
|
185 |
strncpy(data, req_data, 63);
|
186 |
data[63] = 0; |
187 |
printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
|
188 |
"target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
|
189 |
msg, |
190 |
(unsigned int)(pr - peer->peer_reqs), |
191 |
(unsigned int)pr->req->op, |
192 |
(unsigned long long)pr->req->offset, |
193 |
(unsigned long)pr->req->size, |
194 |
(unsigned long)pr->req->serviced, |
195 |
(unsigned long)pr->retval, |
196 |
(unsigned int)pr->req->state, |
197 |
(unsigned int)pr->req->targetlen, target, |
198 |
(unsigned long long)pr->req->datalen, data); |
199 |
} |
200 |
} |
201 |
|
202 |
#ifdef MT
|
203 |
inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t) |
204 |
{ |
205 |
struct peer_req *pr;
|
206 |
xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no); |
207 |
if (idx != Noneidx)
|
208 |
goto out;
|
209 |
|
210 |
/* try to steal from another thread */
|
211 |
/*
|
212 |
int i;
|
213 |
struct thread *nt;
|
214 |
for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
|
215 |
nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
|
216 |
if (!xq_count(&nt->free_thread_reqs))
|
217 |
continue;
|
218 |
idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
|
219 |
if (idx != Noneidx)
|
220 |
goto out;
|
221 |
}
|
222 |
*/
|
223 |
return NULL; |
224 |
out:
|
225 |
pr = peer->peer_reqs + idx; |
226 |
pr->thread_no = t - peer->thread; |
227 |
return pr;
|
228 |
} |
229 |
#else
|
230 |
/*
|
231 |
* free_reqs is a queue that simply contains pointer offsets to the peer_reqs
|
232 |
* queue. If a pointer from peer_reqs is popped, we are certain that the
|
233 |
* associated memory in peer_reqs is free to use
|
234 |
*/
|
235 |
inline struct peer_req *alloc_peer_req(struct peerd *peer) |
236 |
{ |
237 |
xqindex idx = xq_pop_head(&peer->free_reqs, 1);
|
238 |
if (idx == Noneidx)
|
239 |
return NULL; |
240 |
return peer->peer_reqs + idx;
|
241 |
} |
242 |
#endif
|
243 |
|
244 |
inline void free_peer_req(struct peerd *peer, struct peer_req *pr) |
245 |
{ |
246 |
xqindex idx = pr - peer->peer_reqs; |
247 |
pr->req = NULL;
|
248 |
#ifdef MT
|
249 |
struct thread *t = &peer->thread[pr->thread_no];
|
250 |
xq_append_head(&t->free_thread_reqs, idx, 1);
|
251 |
#else
|
252 |
xq_append_head(&peer->free_reqs, idx, 1);
|
253 |
#endif
|
254 |
} |
255 |
|
256 |
/*
|
257 |
* Count all free reqs in peer.
|
258 |
* Racy, if multithreaded, but the sum should monotonicly increase when checked
|
259 |
* after a termination signal is catched.
|
260 |
*/
|
261 |
int all_peer_reqs_free(struct peerd *peer) |
262 |
{ |
263 |
uint32_t free_reqs = 0;
|
264 |
#ifdef MT
|
265 |
int i;
|
266 |
for (i = 0; i < peer->nr_threads; i++) { |
267 |
free_reqs += xq_count(&peer->thread[i].free_thread_reqs); |
268 |
} |
269 |
#else
|
270 |
free_reqs = xq_count(&peer->free_reqs); |
271 |
#endif
|
272 |
if (free_reqs == peer->nr_ops)
|
273 |
return 1; |
274 |
return 0; |
275 |
} |
276 |
|
277 |
struct timeval resp_start, resp_end, resp_accum = {0, 0}; |
278 |
uint64_t responds = 0;
|
279 |
void get_responds_stats(){
|
280 |
printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
|
281 |
//(unsigned int)(t - peer->thread),
|
282 |
resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds); |
283 |
} |
284 |
|
285 |
//FIXME error check
|
286 |
void fail(struct peerd *peer, struct peer_req *pr) |
287 |
{ |
288 |
struct xseg_request *req = pr->req;
|
289 |
uint32_t p; |
290 |
if (req){
|
291 |
XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs)); |
292 |
req->state |= XS_FAILED; |
293 |
//xseg_set_req_data(peer->xseg, pr->req, NULL);
|
294 |
p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); |
295 |
xseg_signal(peer->xseg, p); |
296 |
} |
297 |
free_peer_req(peer, pr); |
298 |
#ifdef MT
|
299 |
wake_up_next_thread(peer); |
300 |
#endif
|
301 |
} |
302 |
|
303 |
//FIXME error check
|
304 |
void complete(struct peerd *peer, struct peer_req *pr) |
305 |
{ |
306 |
struct xseg_request *req = pr->req;
|
307 |
uint32_t p; |
308 |
if (req){
|
309 |
req->state |= XS_SERVED; |
310 |
//xseg_set_req_data(peer->xseg, pr->req, NULL);
|
311 |
//gettimeofday(&resp_start, NULL);
|
312 |
p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); |
313 |
//gettimeofday(&resp_end, NULL);
|
314 |
//responds++;
|
315 |
//timersub(&resp_end, &resp_start, &resp_end);
|
316 |
//timeradd(&resp_end, &resp_accum, &resp_accum);
|
317 |
//printf("xseg_signal: %u\n", p);
|
318 |
xseg_signal(peer->xseg, p); |
319 |
} |
320 |
free_peer_req(peer, pr); |
321 |
#ifdef MT
|
322 |
wake_up_next_thread(peer); |
323 |
#endif
|
324 |
} |
325 |
|
326 |
static void handle_accepted(struct peerd *peer, struct peer_req *pr, |
327 |
struct xseg_request *req)
|
328 |
{ |
329 |
struct xseg_request *xreq = pr->req;
|
330 |
//assert xreq == req;
|
331 |
XSEGLOG2(&lc, D, "Handle accepted");
|
332 |
xreq->serviced = 0;
|
333 |
//xreq->state = XS_ACCEPTED;
|
334 |
pr->retval = 0;
|
335 |
dispatch(peer, pr, req, dispatch_accept); |
336 |
} |
337 |
|
338 |
static void handle_received(struct peerd *peer, struct peer_req *pr, |
339 |
struct xseg_request *req)
|
340 |
{ |
341 |
//struct xseg_request *req = pr->req;
|
342 |
//assert req->state != XS_ACCEPTED;
|
343 |
XSEGLOG2(&lc, D, "Handle received \n");
|
344 |
dispatch(peer, pr, req, dispatch_receive); |
345 |
|
346 |
} |
347 |
struct timeval sub_start, sub_end, sub_accum = {0, 0}; |
348 |
uint64_t submits = 0;
|
349 |
void get_submits_stats(){
|
350 |
printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
|
351 |
//(unsigned int)(t - peer->thread),
|
352 |
sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits); |
353 |
} |
354 |
|
355 |
int submit_peer_req(struct peerd *peer, struct peer_req *pr) |
356 |
{ |
357 |
uint32_t ret; |
358 |
struct xseg_request *req = pr->req;
|
359 |
// assert req->portno == peer->portno ?
|
360 |
//TODO small function with error checking
|
361 |
XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs)); |
362 |
ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
|
363 |
if (ret < 0) |
364 |
return -1; |
365 |
//printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
|
366 |
//gettimeofday(&sub_start, NULL);
|
367 |
ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); |
368 |
//gettimeofday(&sub_end, NULL);
|
369 |
//submits++;
|
370 |
//timersub(&sub_end, &sub_start, &sub_end);
|
371 |
//timeradd(&sub_end, &sub_accum, &sub_accum);
|
372 |
if (ret == NoPort)
|
373 |
return -1; |
374 |
xseg_signal(peer->xseg, ret); |
375 |
return 0; |
376 |
} |
377 |
|
378 |
#ifdef MT
|
379 |
int check_ports(struct peerd *peer, struct thread *t) |
380 |
#else
|
381 |
int check_ports(struct peerd *peer) |
382 |
#endif
|
383 |
{ |
384 |
struct xseg *xseg = peer->xseg;
|
385 |
xport portno_start = peer->portno_start; |
386 |
xport portno_end = peer->portno_end; |
387 |
struct xseg_request *accepted, *received;
|
388 |
struct peer_req *pr;
|
389 |
xport i; |
390 |
int r, c = 0; |
391 |
|
392 |
for (i = portno_start; i <= portno_end; i++) {
|
393 |
accepted = NULL;
|
394 |
received = NULL;
|
395 |
if (!isTerminate()) {
|
396 |
#ifdef MT
|
397 |
pr = alloc_peer_req(peer, t); |
398 |
#else
|
399 |
pr = alloc_peer_req(peer); |
400 |
#endif
|
401 |
if (pr) {
|
402 |
accepted = xseg_accept(xseg, i, X_NONBLOCK); |
403 |
if (accepted) {
|
404 |
pr->req = accepted; |
405 |
pr->portno = i; |
406 |
xseg_cancel_wait(xseg, i); |
407 |
handle_accepted(peer, pr, accepted); |
408 |
c = 1;
|
409 |
} |
410 |
else {
|
411 |
free_peer_req(peer, pr); |
412 |
} |
413 |
} |
414 |
} |
415 |
received = xseg_receive(xseg, i, X_NONBLOCK); |
416 |
if (received) {
|
417 |
r = xseg_get_req_data(xseg, received, (void **) &pr);
|
418 |
if (r < 0 || !pr){ |
419 |
XSEGLOG2(&lc, W, "Received request with no pr data\n");
|
420 |
xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC); |
421 |
if (p == NoPort){
|
422 |
XSEGLOG2(&lc, W, "Could not respond stale request");
|
423 |
xseg_put_request(xseg, received, portno_start); |
424 |
continue;
|
425 |
} else {
|
426 |
xseg_signal(xseg, p); |
427 |
} |
428 |
} else {
|
429 |
//maybe perform sanity check for pr
|
430 |
xseg_cancel_wait(xseg, i); |
431 |
handle_received(peer, pr, received); |
432 |
c = 1;
|
433 |
} |
434 |
} |
435 |
} |
436 |
|
437 |
return c;
|
438 |
} |
439 |
|
440 |
#ifdef MT
|
441 |
static void* thread_loop(void *arg) |
442 |
{ |
443 |
struct thread *t = (struct thread *) arg; |
444 |
struct peerd *peer = t->peer;
|
445 |
struct xseg *xseg = peer->xseg;
|
446 |
xport portno_start = peer->portno_start; |
447 |
xport portno_end = peer->portno_end; |
448 |
pid_t pid =syscall(SYS_gettid); |
449 |
uint64_t loops; |
450 |
uint64_t threshold=1000/(1 + portno_end - portno_start); |
451 |
|
452 |
XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread)); |
453 |
XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); |
454 |
xseg_init_local_signal(xseg, peer->portno_start); |
455 |
for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
|
456 |
for(loops = threshold; loops > 0; loops--) { |
457 |
if (loops == 1) |
458 |
xseg_prepare_wait(xseg, peer->portno_start); |
459 |
if (check_ports(peer, t))
|
460 |
loops = threshold; |
461 |
} |
462 |
XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread)); |
463 |
xseg_wait_signal(xseg, 10000000UL);
|
464 |
xseg_cancel_wait(xseg, peer->portno_start); |
465 |
XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread)); |
466 |
} |
467 |
wake_up_next_thread(peer); |
468 |
custom_peer_finalize(peer); |
469 |
return NULL; |
470 |
} |
471 |
|
472 |
void *init_thread_loop(void *arg) |
473 |
{ |
474 |
struct thread *t = (struct thread *) arg; |
475 |
struct peerd *peer = t->peer;
|
476 |
char *thread_id;
|
477 |
int i;
|
478 |
|
479 |
/*
|
480 |
* We need an identifier for every thread that will spin in peerd_loop.
|
481 |
* The following code is a way to create a string of this format:
|
482 |
* "Thread <num>"
|
483 |
* minus the null terminator. What we do is we create this string with
|
484 |
* snprintf and then resize it to exclude the null terminator with
|
485 |
* realloc. Finally, the result string is passed to the (void *arg) field
|
486 |
* of struct thread.
|
487 |
*
|
488 |
* Since the highest thread number can't be more than 5 digits, using 13
|
489 |
* chars should be more than enough.
|
490 |
*/
|
491 |
thread_id = malloc(13 * sizeof(char)); |
492 |
snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread); |
493 |
for (i = 0; thread_id[i]; i++) {} |
494 |
t->arg = (void *)realloc(thread_id, i-1); |
495 |
pthread_setspecific(threadkey, t); |
496 |
|
497 |
//Start thread loop
|
498 |
(void)peer->peerd_loop(t);
|
499 |
|
500 |
wake_up_next_thread(peer); |
501 |
custom_peer_finalize(peer); |
502 |
|
503 |
return NULL; |
504 |
} |
505 |
|
506 |
int peerd_start_threads(struct peerd *peer) |
507 |
{ |
508 |
int i;
|
509 |
uint32_t nr_threads = peer->nr_threads; |
510 |
//TODO err check
|
511 |
for (i = 0; i < nr_threads; i++) { |
512 |
peer->thread[i].thread_no = i; |
513 |
peer->thread[i].peer = peer; |
514 |
pthread_create(&peer->thread[i].tid, NULL,
|
515 |
init_thread_loop, (void *)(peer->thread + i));
|
516 |
} |
517 |
|
518 |
if (peer->interactive_func)
|
519 |
peer->interactive_func(); |
520 |
for (i = 0; i < nr_threads; i++) { |
521 |
pthread_join(peer->thread[i].tid, NULL);
|
522 |
} |
523 |
|
524 |
return 0; |
525 |
} |
526 |
#endif
|
527 |
|
528 |
int defer_request(struct peerd *peer, struct peer_req *pr) |
529 |
{ |
530 |
int r;
|
531 |
xport p; |
532 |
if (!canDefer(peer)){
|
533 |
XSEGLOG2(&lc, E, "Peer cannot defer requests");
|
534 |
return -1; |
535 |
} |
536 |
p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno, |
537 |
X_ALLOC); |
538 |
if (p == NoPort){
|
539 |
XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
|
540 |
return -1; |
541 |
} |
542 |
r = xseg_signal(peer->xseg, p); |
543 |
if (r < 0) { |
544 |
XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
|
545 |
} |
546 |
free_peer_req(peer, pr); |
547 |
return 0; |
548 |
} |
549 |
|
550 |
/*
|
551 |
* generic_peerd_loop is a general-purpose port-checker loop that is
|
552 |
* suitable both for multi-threaded and single-threaded peers.
|
553 |
*/
|
554 |
static int generic_peerd_loop(void *arg) |
555 |
{ |
556 |
#ifdef MT
|
557 |
struct thread *t = (struct thread *) arg; |
558 |
struct peerd *peer = t->peer;
|
559 |
char *id = t->arg;
|
560 |
#else
|
561 |
struct peerd *peer = (struct peerd *) arg; |
562 |
char id[4] = {'P','e','e','r'}; |
563 |
#endif
|
564 |
struct xseg *xseg = peer->xseg;
|
565 |
xport portno_start = peer->portno_start; |
566 |
xport portno_end = peer->portno_end; |
567 |
pid_t pid = syscall(SYS_gettid); |
568 |
uint64_t threshold=1000/(1 + portno_end - portno_start); |
569 |
uint64_t loops; |
570 |
|
571 |
XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
|
572 |
xseg_init_local_signal(xseg, peer->portno_start); |
573 |
//for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
|
574 |
for (;!(isTerminate() && all_peer_reqs_free(peer));) {
|
575 |
//Heart of peerd_loop. This loop is common for everyone.
|
576 |
for(loops = threshold; loops > 0; loops--) { |
577 |
if (loops == 1) |
578 |
xseg_prepare_wait(xseg, peer->portno_start); |
579 |
#ifdef MT
|
580 |
if (check_ports(peer, t))
|
581 |
#else
|
582 |
if (check_ports(peer))
|
583 |
#endif
|
584 |
loops = threshold; |
585 |
} |
586 |
#ifdef ST_THREADS
|
587 |
if (ta){
|
588 |
st_sleep(0);
|
589 |
continue;
|
590 |
} |
591 |
#endif
|
592 |
XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
|
593 |
xseg_wait_signal(xseg, 10000000UL);
|
594 |
xseg_cancel_wait(xseg, peer->portno_start); |
595 |
XSEGLOG2(&lc, I, "%s woke up\n", id);
|
596 |
} |
597 |
return 0; |
598 |
} |
599 |
|
600 |
static int init_peerd_loop(struct peerd *peer) |
601 |
{ |
602 |
struct xseg *xseg = peer->xseg;
|
603 |
|
604 |
peer->peerd_loop(peer); |
605 |
custom_peer_finalize(peer); |
606 |
xseg_quit_local_signal(xseg, peer->portno_start); |
607 |
|
608 |
return 0; |
609 |
} |
610 |
|
611 |
static struct xseg *join(char *spec) |
612 |
{ |
613 |
struct xseg_config config;
|
614 |
struct xseg *xseg;
|
615 |
|
616 |
(void)xseg_parse_spec(spec, &config);
|
617 |
xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
|
618 |
if (xseg)
|
619 |
return xseg;
|
620 |
|
621 |
(void)xseg_create(&config);
|
622 |
return xseg_join(config.type, config.name, PEER_TYPE, NULL); |
623 |
} |
624 |
|
625 |
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, |
626 |
long portno_end, uint32_t nr_threads, xport defer_portno)
|
627 |
{ |
628 |
int i;
|
629 |
struct peerd *peer;
|
630 |
struct xseg_port *port;
|
631 |
void *sd = NULL; |
632 |
xport p; |
633 |
|
634 |
#ifdef ST_THREADS
|
635 |
st_init(); |
636 |
#endif
|
637 |
peer = malloc(sizeof(struct peerd)); |
638 |
if (!peer) {
|
639 |
perror("malloc");
|
640 |
return NULL; |
641 |
} |
642 |
peer->nr_ops = nr_ops; |
643 |
peer->defer_portno = defer_portno; |
644 |
#ifdef MT
|
645 |
peer->nr_threads = nr_threads; |
646 |
peer->thread = calloc(nr_threads, sizeof(struct thread)); |
647 |
if (!peer->thread)
|
648 |
goto malloc_fail;
|
649 |
if (!xq_alloc_empty(&peer->threads, nr_threads))
|
650 |
goto malloc_fail;
|
651 |
for (i = 0; i < nr_threads; i++) { |
652 |
if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
|
653 |
goto malloc_fail;
|
654 |
} |
655 |
for (i = 0; i < nr_ops; i++) { |
656 |
__xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i); |
657 |
} |
658 |
|
659 |
pthread_key_create(&threadkey, NULL);
|
660 |
#else
|
661 |
if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
|
662 |
goto malloc_fail;
|
663 |
#endif
|
664 |
peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req)); |
665 |
if (!peer->peer_reqs){
|
666 |
malloc_fail:
|
667 |
perror("malloc");
|
668 |
return NULL; |
669 |
} |
670 |
if (xseg_initialize()){
|
671 |
printf("cannot initialize library\n");
|
672 |
return NULL; |
673 |
} |
674 |
peer->xseg = join(spec); |
675 |
if (!peer->xseg)
|
676 |
return NULL; |
677 |
|
678 |
peer->portno_start = (xport) portno_start; |
679 |
peer->portno_end= (xport) portno_end; |
680 |
|
681 |
/*
|
682 |
* Start binding ports from portno_start to portno_end.
|
683 |
* The first port we bind will have its signal_desc initialized by xseg
|
684 |
* and the same signal_desc will be used for all the other ports.
|
685 |
*/
|
686 |
for (p = peer->portno_start; p <= peer->portno_end; p++) {
|
687 |
port = xseg_bind_port(peer->xseg, p, sd); |
688 |
if (!port){
|
689 |
printf("cannot bind to port %u\n", (unsigned int) p); |
690 |
return NULL; |
691 |
} |
692 |
if (p == peer->portno_start)
|
693 |
sd = xseg_get_signal_desc(peer->xseg, port); |
694 |
} |
695 |
|
696 |
printf("Peer on ports %u-%u\n", peer->portno_start, peer->portno_end);
|
697 |
|
698 |
for (i = 0; i < nr_ops; i++) { |
699 |
peer->peer_reqs[i].peer = peer; |
700 |
peer->peer_reqs[i].req = NULL;
|
701 |
peer->peer_reqs[i].retval = 0;
|
702 |
peer->peer_reqs[i].priv = NULL;
|
703 |
peer->peer_reqs[i].portno = NoPort; |
704 |
|
705 |
//Plug default peerd_loop. This can change later on by custom_peer_init.
|
706 |
peer->peerd_loop = generic_peerd_loop; |
707 |
|
708 |
#ifdef ST_THREADS
|
709 |
peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
|
710 |
#endif
|
711 |
} |
712 |
#ifdef MT
|
713 |
peer->interactive_func = NULL;
|
714 |
#endif
|
715 |
return peer;
|
716 |
|
717 |
} |
718 |
|
719 |
int pidfile_remove(char *path, int fd) |
720 |
{ |
721 |
close(fd); |
722 |
return (unlink(path));
|
723 |
} |
724 |
|
725 |
int pidfile_write(int pid_fd) |
726 |
{ |
727 |
char buf[16]; |
728 |
snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid)); |
729 |
buf[15] = 0; |
730 |
|
731 |
lseek(pid_fd, 0, SEEK_SET);
|
732 |
int ret = write(pid_fd, buf, strlen(buf));
|
733 |
return ret;
|
734 |
} |
735 |
|
736 |
int pidfile_read(char *path, pid_t *pid) |
737 |
{ |
738 |
char buf[16], *endptr; |
739 |
*pid = 0;
|
740 |
|
741 |
int fd = open(path, O_RDONLY);
|
742 |
if (fd < 0) |
743 |
return -1; |
744 |
int ret = read(fd, buf, 15); |
745 |
buf[15]=0; |
746 |
close(fd); |
747 |
if (ret < 0) |
748 |
return -1; |
749 |
else{
|
750 |
*pid = strtol(buf, &endptr, 10);
|
751 |
if (endptr != &buf[ret]){
|
752 |
*pid = 0;
|
753 |
return -1; |
754 |
} |
755 |
} |
756 |
return 0; |
757 |
} |
758 |
|
759 |
int pidfile_open(char *path, pid_t *old_pid) |
760 |
{ |
761 |
//nfs version > 3
|
762 |
int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
|
763 |
if (fd < 0){ |
764 |
if (errno == EEXIST)
|
765 |
pidfile_read(path, old_pid); |
766 |
} |
767 |
return fd;
|
768 |
} |
769 |
|
770 |
void usage(char *argv0) |
771 |
{ |
772 |
fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
|
773 |
fprintf(stderr, "General peer options:\n"
|
774 |
" Option | Default | \n"
|
775 |
" --------------------------------------------\n"
|
776 |
" -g | None | Segment spec to join\n"
|
777 |
" -sp | NoPort | Start portno to bind\n"
|
778 |
" -ep | NoPort | End portno to bind\n"
|
779 |
" -p | NoPort | Portno to bind\n"
|
780 |
" -n | 16 | Number of ops\n"
|
781 |
" -v | 0 | Verbosity level\n"
|
782 |
" -l | None | Logfile \n"
|
783 |
" -d | No | Daemonize \n"
|
784 |
" --pidfile | None | Pidfile \n"
|
785 |
#ifdef MT
|
786 |
" -t | No | Number of threads \n"
|
787 |
#endif
|
788 |
"\n"
|
789 |
); |
790 |
custom_peer_usage(); |
791 |
} |
792 |
|
793 |
int main(int argc, char *argv[]) |
794 |
{ |
795 |
struct peerd *peer = NULL; |
796 |
//parse args
|
797 |
int r;
|
798 |
long portno_start = -1, portno_end = -1, portno = -1; |
799 |
|
800 |
//set defaults here
|
801 |
int daemonize = 0, help = 0; |
802 |
uint32_t nr_ops = 16;
|
803 |
uint32_t nr_threads = 1;
|
804 |
unsigned int debug_level = 0; |
805 |
xport defer_portno = NoPort; |
806 |
pid_t old_pid; |
807 |
int pid_fd = -1; |
808 |
|
809 |
char spec[MAX_SPEC_LEN + 1]; |
810 |
char logfile[MAX_LOGFILE_LEN + 1]; |
811 |
char pidfile[MAX_PIDFILE_LEN + 1]; |
812 |
|
813 |
logfile[0] = 0; |
814 |
pidfile[0] = 0; |
815 |
spec[0] = 0; |
816 |
|
817 |
//capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
|
818 |
// -dp xseg_portno to defer blocking requests
|
819 |
// -l log file ?
|
820 |
//TODO print messages on arg parsing error
|
821 |
BEGIN_READ_ARGS(argc, argv); |
822 |
READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
|
823 |
READ_ARG_ULONG("-sp", portno_start);
|
824 |
READ_ARG_ULONG("-ep", portno_end);
|
825 |
READ_ARG_ULONG("-p", portno);
|
826 |
READ_ARG_ULONG("-n", nr_ops);
|
827 |
READ_ARG_ULONG("-v", debug_level);
|
828 |
#ifdef MT
|
829 |
READ_ARG_ULONG("-t", nr_threads);
|
830 |
#endif
|
831 |
READ_ARG_ULONG("-dp", defer_portno);
|
832 |
READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
|
833 |
READ_ARG_BOOL("-d", daemonize);
|
834 |
READ_ARG_BOOL("-h", help);
|
835 |
READ_ARG_BOOL("--help", help);
|
836 |
READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
|
837 |
END_READ_ARGS(); |
838 |
|
839 |
if (help){
|
840 |
usage(argv[0]);
|
841 |
return 0; |
842 |
} |
843 |
|
844 |
r = init_logctx(&lc, argv[0], debug_level, logfile,
|
845 |
REDIRECT_STDOUT|REDIRECT_STDERR); |
846 |
if (r < 0){ |
847 |
XSEGLOG("Cannot initialize logging to logfile");
|
848 |
return -1; |
849 |
} |
850 |
XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
|
851 |
|
852 |
if (pidfile[0]){ |
853 |
pid_fd = pidfile_open(pidfile, &old_pid); |
854 |
if (pid_fd < 0) { |
855 |
if (old_pid) {
|
856 |
XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
|
857 |
} else {
|
858 |
XSEGLOG2(&lc, E, "Cannot open or create pidfile");
|
859 |
} |
860 |
return -1; |
861 |
} |
862 |
} |
863 |
|
864 |
if (daemonize){
|
865 |
if (daemon(0, 1) < 0){ |
866 |
XSEGLOG2(&lc, E, "Cannot daemonize");
|
867 |
r = -1;
|
868 |
goto out;
|
869 |
} |
870 |
} |
871 |
|
872 |
pidfile_write(pid_fd); |
873 |
|
874 |
//TODO perform argument sanity checks
|
875 |
verbose = debug_level; |
876 |
if (portno != -1) { |
877 |
portno_start = portno; |
878 |
portno_end = portno; |
879 |
} |
880 |
if (portno_start == -1 || portno_end == -1){ |
881 |
XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
|
882 |
usage(argv[0]);
|
883 |
r = -1;
|
884 |
goto out;
|
885 |
} |
886 |
|
887 |
peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno); |
888 |
if (!peer){
|
889 |
r = -1;
|
890 |
goto out;
|
891 |
} |
892 |
setup_signals(peer); |
893 |
r = custom_peer_init(peer, argc, argv); |
894 |
if (r < 0) |
895 |
goto out;
|
896 |
#if defined(MT)
|
897 |
//TODO err check
|
898 |
peerd_start_threads(peer); |
899 |
#elif defined(ST_THREADS)
|
900 |
st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0); |
901 |
r = st_thread_join(st, NULL);
|
902 |
#else
|
903 |
r = init_peerd_loop(peer); |
904 |
#endif
|
905 |
out:
|
906 |
if (pid_fd > 0) |
907 |
pidfile_remove(pidfile, pid_fd); |
908 |
return r;
|
909 |
} |