Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / peer.c @ cb5cf301

History | View | Annotate | Download (22.9 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <sys/types.h>
39
#include <unistd.h>
40
#include <sys/syscall.h>
41
#include <sys/time.h>
42
#include <signal.h>
43
#include <sys/stat.h>
44
#include <fcntl.h>
45
#include <errno.h>
46
#ifdef MT
47
#include <pthread.h>
48
#endif
49

    
50
#include <xseg/xseg.h>
51
#include <peer.h>
52

    
53
#ifdef MT
54
#ifdef ST_THREADS
55
#error "MT and ST_THREADS defines are mutually exclusive"
56
#endif
57
#endif
58

    
59
#ifdef MT
60
#define PEER_TYPE "pthread"
61
#else
62
#define PEER_TYPE "posix"
63
#endif
64

    
65
//FIXME this should not be defined here probably
66
#define MAX_SPEC_LEN 128
67
#define MAX_PIDFILE_LEN 512
68

    
69
volatile unsigned int terminated = 0;
70
unsigned int verbose = 0;
71
struct log_ctx lc;
72
#ifdef ST_THREADS
73
uint32_t ta = 0;
74
#endif
75

    
76
#ifdef MT
77
struct peerd *global_peer;
78
static pthread_key_t threadkey;
79

    
80
inline static int wake_up_next_thread(struct peerd *peer)
81
{
82
        return (xseg_signal(peer->xseg, peer->portno_start));
83
}
84
#endif
85

    
86
/*
87
 * extern is needed if this function is going to be called by another file
88
 * such as bench-xseg.c
89
 */
90

    
91
void signal_handler(int signal)
92
{
93
        XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
94
        terminated = 1;
95
#ifdef MT
96
        wake_up_next_thread(global_peer);
97
#endif
98
}
99

    
100
void renew_logfile(int signal)
101
{
102
        XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
103
        renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
104
}
105

    
106
static int setup_signals(struct peerd *peer)
107
{
108
        int r;
109
        struct sigaction sa;
110
#ifdef MT
111
        global_peer = peer;
112
#endif
113
        sigemptyset(&sa.sa_mask);
114
        sa.sa_flags = 0;
115
        sa.sa_handler = signal_handler;
116
        r = sigaction(SIGTERM, &sa, NULL);
117
        if (r < 0)
118
                return r;
119
        r = sigaction(SIGINT, &sa, NULL);
120
        if (r < 0)
121
                return r;
122
        r = sigaction(SIGQUIT, &sa, NULL);
123
        if (r < 0)
124
                return r;
125

    
126
        sa.sa_handler = renew_logfile;
127
        r = sigaction(SIGUSR1, &sa, NULL);
128
        if (r < 0)
129
                return r;
130

    
131
        return r;
132
}
133

    
134
inline int canDefer(struct peerd *peer)
135
{
136
        return !(peer->defer_portno == NoPort);
137
}
138

    
139
void print_req(struct xseg *xseg, struct xseg_request *req)
140
{
141
        char target[64], data[64];
142
        char *req_target, *req_data;
143
        unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
144
        req_target = xseg_get_target(xseg, req);
145
        req_data = xseg_get_data(xseg, req);
146

    
147
        if (1) {
148
                strncpy(target, req_target, end);
149
                target[end] = 0;
150
                strncpy(data, req_data, 63);
151
                data[63] = 0;
152
                printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
153
                                "src: %u, transit: %u, dst: %u effective dst: %u\n"
154
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
155
                                (unsigned long)(req),
156
                                (unsigned int)req->op,
157
                                (unsigned long long)req->offset,
158
                                (unsigned long)req->size,
159
                                (unsigned long)req->serviced,
160
                                (unsigned int)req->state,
161
                                (unsigned int)req->src_portno,
162
                                (unsigned int)req->transit_portno,
163
                                (unsigned int)req->dst_portno,
164
                                (unsigned int)req->effective_dst_portno,
165
                                (unsigned int)req->targetlen, target,
166
                                (unsigned long long)req->datalen, data);
167
        }
168
}
169
void log_pr(char *msg, struct peer_req *pr)
170
{
171
        char target[64], data[64];
172
        char *req_target, *req_data;
173
        struct peerd *peer = pr->peer;
174
        struct xseg *xseg = pr->peer->xseg;
175
        req_target = xseg_get_target(xseg, pr->req);
176
        req_data = xseg_get_data(xseg, pr->req);
177
        /* null terminate name in case of req->target is less than 63 characters,
178
         * and next character after name (aka first byte of next buffer) is not
179
         * null
180
         */
181
        unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
182
        if (verbose) {
183
                strncpy(target, req_target, end);
184
                target[end] = 0;
185
                strncpy(data, req_data, 63);
186
                data[63] = 0;
187
                printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
188
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
189
                                msg,
190
                                (unsigned int)(pr - peer->peer_reqs),
191
                                (unsigned int)pr->req->op,
192
                                (unsigned long long)pr->req->offset,
193
                                (unsigned long)pr->req->size,
194
                                (unsigned long)pr->req->serviced,
195
                                (unsigned long)pr->retval,
196
                                (unsigned int)pr->req->state,
197
                                (unsigned int)pr->req->targetlen, target,
198
                                (unsigned long long)pr->req->datalen, data);
199
        }
200
}
201

    
202
#ifdef MT
203
inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
204
{
205
        struct peer_req *pr;
206
        xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
207
        if (idx != Noneidx)
208
                goto out;
209

    
210
        /* try to steal from another thread */
211
        /*
212
        int i;
213
        struct thread *nt;
214
        for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
215
                nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
216
                if (!xq_count(&nt->free_thread_reqs))
217
                                continue;
218
                idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
219
                if (idx != Noneidx)
220
                        goto out;
221
        }
222
        */
223
        return NULL;
224
out:
225
        pr = peer->peer_reqs + idx;
226
        pr->thread_no = t - peer->thread;
227
        return pr;
228
}
229
#else
230
/*
231
 * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
232
 * queue. If a pointer from peer_reqs is popped, we are certain that the
233
 * associated memory in peer_reqs is free to use
234
 */
235
inline struct peer_req *alloc_peer_req(struct peerd *peer)
236
{
237
        xqindex idx = xq_pop_head(&peer->free_reqs, 1);
238
        if (idx == Noneidx)
239
                return NULL;
240
        return peer->peer_reqs + idx;
241
}
242
#endif
243

    
244
inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
245
{
246
        xqindex idx = pr - peer->peer_reqs;
247
        pr->req = NULL;
248
#ifdef MT
249
        struct thread *t = &peer->thread[pr->thread_no];
250
        xq_append_head(&t->free_thread_reqs, idx, 1);
251
#else
252
        xq_append_head(&peer->free_reqs, idx, 1);
253
#endif
254
}
255

    
256
/*
257
 * Count all free reqs in peer.
258
 * Racy, if multithreaded, but the sum should monotonicly increase when checked
259
 * after a termination signal is catched.
260
 */
261
int all_peer_reqs_free(struct peerd *peer)
262
{
263
        uint32_t free_reqs = 0;
264
#ifdef MT
265
        int i;
266
        for (i = 0; i < peer->nr_threads; i++) {
267
                free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
268
        }
269
#else
270
        free_reqs = xq_count(&peer->free_reqs);
271
#endif
272
        if (free_reqs == peer->nr_ops)
273
                return 1;
274
        return 0;
275
}
276

    
277
struct timeval resp_start, resp_end, resp_accum = {0, 0};
278
uint64_t responds = 0;
279
void get_responds_stats(){
280
                printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
281
                                //(unsigned int)(t - peer->thread),
282
                                resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
283
}
284

    
285
//FIXME error check
286
void fail(struct peerd *peer, struct peer_req *pr)
287
{
288
        struct xseg_request *req = pr->req;
289
        uint32_t p;
290
        if (req){
291
                XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
292
                req->state |= XS_FAILED;
293
                //xseg_set_req_data(peer->xseg, pr->req, NULL);
294
                p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
295
                xseg_signal(peer->xseg, p);
296
        }
297
        free_peer_req(peer, pr);
298
#ifdef MT
299
        wake_up_next_thread(peer);
300
#endif
301
}
302

    
303
//FIXME error check
304
void complete(struct peerd *peer, struct peer_req *pr)
305
{
306
        struct xseg_request *req = pr->req;
307
        uint32_t p;
308
        if (req){
309
                req->state |= XS_SERVED;
310
                //xseg_set_req_data(peer->xseg, pr->req, NULL);
311
                //gettimeofday(&resp_start, NULL);
312
                p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
313
                //gettimeofday(&resp_end, NULL);
314
                //responds++;
315
                //timersub(&resp_end, &resp_start, &resp_end);
316
                //timeradd(&resp_end, &resp_accum, &resp_accum);
317
                //printf("xseg_signal: %u\n", p);
318
                xseg_signal(peer->xseg, p);
319
        }
320
        free_peer_req(peer, pr);
321
#ifdef MT
322
        wake_up_next_thread(peer);
323
#endif
324
}
325

    
326
static void handle_accepted(struct peerd *peer, struct peer_req *pr,
327
                                struct xseg_request *req)
328
{
329
        struct xseg_request *xreq = pr->req;
330
        //assert xreq == req;
331
        XSEGLOG2(&lc, D, "Handle accepted");
332
        xreq->serviced = 0;
333
        //xreq->state = XS_ACCEPTED;
334
        pr->retval = 0;
335
        dispatch(peer, pr, req, dispatch_accept);
336
}
337

    
338
static void handle_received(struct peerd *peer, struct peer_req *pr,
339
                                struct xseg_request *req)
340
{
341
        //struct xseg_request *req = pr->req;
342
        //assert req->state != XS_ACCEPTED;
343
        XSEGLOG2(&lc, D, "Handle received \n");
344
        dispatch(peer, pr, req, dispatch_receive);
345

    
346
}
347
struct timeval sub_start, sub_end, sub_accum = {0, 0};
348
uint64_t submits = 0;
349
void get_submits_stats(){
350
                printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
351
                                //(unsigned int)(t - peer->thread),
352
                                sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
353
}
354

    
355
int submit_peer_req(struct peerd *peer, struct peer_req *pr)
356
{
357
        uint32_t ret;
358
        struct xseg_request *req = pr->req;
359
        // assert req->portno == peer->portno ?
360
        //TODO small function with error checking
361
        XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
362
        ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
363
        if (ret < 0)
364
                return -1;
365
        //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
366
        //gettimeofday(&sub_start, NULL);
367
        ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
368
        //gettimeofday(&sub_end, NULL);
369
        //submits++;
370
        //timersub(&sub_end, &sub_start, &sub_end);
371
        //timeradd(&sub_end, &sub_accum, &sub_accum);
372
        if (ret == NoPort)
373
                return -1;
374
        xseg_signal(peer->xseg, ret);
375
        return 0;
376
}
377

    
378
#ifdef MT
379
int check_ports(struct peerd *peer, struct thread *t)
380
#else
381
int check_ports(struct peerd *peer)
382
#endif
383
{
384
        struct xseg *xseg = peer->xseg;
385
        xport portno_start = peer->portno_start;
386
        xport portno_end = peer->portno_end;
387
        struct xseg_request *accepted, *received;
388
        struct peer_req *pr;
389
        xport i;
390
        int  r, c = 0;
391

    
392
        for (i = portno_start; i <= portno_end; i++) {
393
                accepted = NULL;
394
                received = NULL;
395
                if (!isTerminate()) {
396
#ifdef MT
397
                        pr = alloc_peer_req(peer, t); 
398
#else
399
                        pr = alloc_peer_req(peer);
400
#endif
401
                        if (pr) {
402
                                accepted = xseg_accept(xseg, i, X_NONBLOCK);
403
                                if (accepted) {
404
                                        pr->req = accepted;
405
                                        pr->portno = i;
406
                                        xseg_cancel_wait(xseg, i);
407
                                        handle_accepted(peer, pr, accepted);
408
                                        c = 1;
409
                                }
410
                                else {
411
                                        free_peer_req(peer, pr);
412
                                }
413
                        }
414
                }
415
                received = xseg_receive(xseg, i, X_NONBLOCK);
416
                if (received) {
417
                        r =  xseg_get_req_data(xseg, received, (void **) &pr);
418
                        if (r < 0 || !pr){
419
                                XSEGLOG2(&lc, W, "Received request with no pr data\n");
420
                                xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
421
                                if (p == NoPort){
422
                                        XSEGLOG2(&lc, W, "Could not respond stale request");
423
                                        xseg_put_request(xseg, received, portno_start);
424
                                        continue;
425
                                } else {
426
                                        xseg_signal(xseg, p);
427
                                }
428
                        } else {
429
                                //maybe perform sanity check for pr
430
                                xseg_cancel_wait(xseg, i);
431
                                handle_received(peer, pr, received);
432
                                c = 1;
433
                        }
434
                }
435
        }
436

    
437
        return c;
438
}
439

    
440
#ifdef MT
441
static void* thread_loop(void *arg)
442
{
443
        struct thread *t = (struct thread *) arg;
444
        struct peerd *peer = t->peer;
445
        struct xseg *xseg = peer->xseg;
446
        xport portno_start = peer->portno_start;
447
        xport portno_end = peer->portno_end;
448
        pid_t pid =syscall(SYS_gettid);
449
        uint64_t loops;
450
        uint64_t threshold=1000/(1 + portno_end - portno_start);
451

    
452
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
453
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
454
        xseg_init_local_signal(xseg, peer->portno_start);
455
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
456
                for(loops =  threshold; loops > 0; loops--) {
457
                        if (loops == 1)
458
                                xseg_prepare_wait(xseg, peer->portno_start);
459
                        if (check_ports(peer, t))
460
                                loops = threshold;
461
                }
462
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
463
                xseg_wait_signal(xseg, 10000000UL);
464
                xseg_cancel_wait(xseg, peer->portno_start);
465
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
466
        }
467
        wake_up_next_thread(peer);
468
        custom_peer_finalize(peer);
469
        return NULL;
470
}
471

    
472
void *init_thread_loop(void *arg)
473
{
474
        struct thread *t = (struct thread *) arg;
475
        struct peerd *peer = t->peer;
476
        char *thread_id;
477
        int i;
478

    
479
        /*
480
         * We need an identifier for every thread that will spin in peerd_loop.
481
         * The following code is a way to create a string of this format:
482
         *                "Thread <num>"
483
         * minus the null terminator. What we do is we create this string with
484
         * snprintf and then resize it to exclude the null terminator with
485
         * realloc. Finally, the result string is passed to the (void *arg) field
486
         * of struct thread.
487
         *
488
         * Since the highest thread number can't be more than 5 digits, using 13
489
         * chars should be more than enough.
490
         */
491
        thread_id = malloc(13 * sizeof(char));
492
        snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
493
        for (i = 0; thread_id[i]; i++) {}
494
        t->arg = (void *)realloc(thread_id, i-1);
495
        pthread_setspecific(threadkey, t);
496

    
497
        //Start thread loop
498
        (void)peer->peerd_loop(t);
499

    
500
        wake_up_next_thread(peer);
501
        custom_peer_finalize(peer);
502

    
503
        return NULL;
504
}
505

    
506
int peerd_start_threads(struct peerd *peer)
507
{
508
        int i;
509
        uint32_t nr_threads = peer->nr_threads;
510
        //TODO err check
511
        for (i = 0; i < nr_threads; i++) {
512
                peer->thread[i].thread_no = i;
513
                peer->thread[i].peer = peer;
514
                pthread_create(&peer->thread[i].tid, NULL,
515
                                        init_thread_loop, (void *)(peer->thread + i));
516
        }
517

    
518
        if (peer->interactive_func)
519
                peer->interactive_func();
520
        for (i = 0; i < nr_threads; i++) {
521
                pthread_join(peer->thread[i].tid, NULL);
522
        }
523

    
524
        return 0;
525
}
526
#endif
527

    
528
int defer_request(struct peerd *peer, struct peer_req *pr)
529
{
530
        int r;
531
        xport p;
532
        if (!canDefer(peer)){
533
                XSEGLOG2(&lc, E, "Peer cannot defer requests");
534
                return -1;
535
        }
536
        p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
537
                        X_ALLOC);
538
        if (p == NoPort){
539
                XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
540
                return -1;
541
        }
542
        r = xseg_signal(peer->xseg, p);
543
        if (r < 0) {
544
                XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
545
        }
546
        free_peer_req(peer, pr);
547
        return 0;
548
}
549

    
550
/*
551
 * generic_peerd_loop is a general-purpose port-checker loop that is
552
 * suitable both for multi-threaded and single-threaded peers.
553
 */
554
static int generic_peerd_loop(void *arg)
555
{
556
#ifdef MT
557
        struct thread *t = (struct thread *) arg;
558
        struct peerd *peer = t->peer;
559
        char *id = t->arg;
560
#else
561
        struct peerd *peer = (struct peerd *) arg;
562
        char id[4] = {'P','e','e','r'};
563
#endif
564
        struct xseg *xseg = peer->xseg;
565
        xport portno_start = peer->portno_start;
566
        xport portno_end = peer->portno_end;
567
        pid_t pid = syscall(SYS_gettid);
568
        uint64_t threshold=1000/(1 + portno_end - portno_start);
569
        uint64_t loops;
570

    
571
        XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
572
        xseg_init_local_signal(xseg, peer->portno_start);
573
        //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
574
        for (;!(isTerminate() && all_peer_reqs_free(peer));) {
575
                //Heart of peerd_loop. This loop is common for everyone.
576
                for(loops = threshold; loops > 0; loops--) {
577
                        if (loops == 1)
578
                                xseg_prepare_wait(xseg, peer->portno_start);
579
#ifdef MT
580
                        if (check_ports(peer, t))
581
#else
582
                        if (check_ports(peer))
583
#endif
584
                                loops = threshold;
585
                }
586
#ifdef ST_THREADS
587
                if (ta){
588
                        st_sleep(0);
589
                        continue;
590
                }
591
#endif
592
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
593
                xseg_wait_signal(xseg, 10000000UL);
594
                xseg_cancel_wait(xseg, peer->portno_start);
595
                XSEGLOG2(&lc, I, "%s woke up\n", id);
596
        }
597
        return 0;
598
}
599

    
600
static int init_peerd_loop(struct peerd *peer)
601
{
602
        struct xseg *xseg = peer->xseg;
603

    
604
        peer->peerd_loop(peer);
605
        custom_peer_finalize(peer);
606
        xseg_quit_local_signal(xseg, peer->portno_start);
607

    
608
        return 0;
609
}
610

    
611
static struct xseg *join(char *spec)
612
{
613
        struct xseg_config config;
614
        struct xseg *xseg;
615

    
616
        (void)xseg_parse_spec(spec, &config);
617
        xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
618
        if (xseg)
619
                return xseg;
620

    
621
        (void)xseg_create(&config);
622
        return xseg_join(config.type, config.name, PEER_TYPE, NULL);
623
}
624

    
625
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
626
                        long portno_end, uint32_t nr_threads, xport defer_portno)
627
{
628
        int i;
629
        struct peerd *peer;
630
        struct xseg_port *port;
631
        void *sd = NULL;
632
        xport p;
633

    
634
#ifdef ST_THREADS
635
        st_init();
636
#endif
637
        peer = malloc(sizeof(struct peerd));
638
        if (!peer) {
639
                perror("malloc");
640
                return NULL;
641
        }
642
        peer->nr_ops = nr_ops;
643
        peer->defer_portno = defer_portno;
644
#ifdef MT
645
        peer->nr_threads = nr_threads;
646
        peer->thread = calloc(nr_threads, sizeof(struct thread));
647
        if (!peer->thread)
648
                goto malloc_fail;
649
        if (!xq_alloc_empty(&peer->threads, nr_threads))
650
                goto malloc_fail;
651
        for (i = 0; i < nr_threads; i++) {
652
                if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
653
                        goto malloc_fail;
654
        }
655
        for (i = 0; i < nr_ops; i++) {
656
                __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
657
        }
658

    
659
        pthread_key_create(&threadkey, NULL);
660
#else
661
        if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
662
                goto malloc_fail;
663
#endif
664
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
665
        if (!peer->peer_reqs){
666
malloc_fail:
667
                perror("malloc");
668
                return NULL;
669
        }
670
        if (xseg_initialize()){
671
                printf("cannot initialize library\n");
672
                return NULL;
673
        }
674
        peer->xseg = join(spec);
675
        if (!peer->xseg)
676
                return NULL;
677

    
678
        peer->portno_start = (xport) portno_start;
679
        peer->portno_end= (xport) portno_end;
680

    
681
        /*
682
         * Start binding ports from portno_start to portno_end.
683
         * The first port we bind will have its signal_desc initialized by xseg
684
         * and the same signal_desc will be used for all the other ports.
685
         */
686
        for (p = peer->portno_start; p <= peer->portno_end; p++) {
687
                port = xseg_bind_port(peer->xseg, p, sd);
688
                if (!port){
689
                        printf("cannot bind to port %u\n", (unsigned int) p);
690
                        return NULL;
691
                }
692
                if (p == peer->portno_start)
693
                        sd = xseg_get_signal_desc(peer->xseg, port);
694
        }
695

    
696
        printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
697

    
698
        for (i = 0; i < nr_ops; i++) {
699
                peer->peer_reqs[i].peer = peer;
700
                peer->peer_reqs[i].req = NULL;
701
                peer->peer_reqs[i].retval = 0;
702
                peer->peer_reqs[i].priv = NULL;
703
                peer->peer_reqs[i].portno = NoPort;
704

    
705
        //Plug default peerd_loop. This can change later on by custom_peer_init.
706
        peer->peerd_loop = generic_peerd_loop;
707

    
708
#ifdef ST_THREADS
709
                peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
710
#endif
711
        }
712
#ifdef MT
713
        peer->interactive_func = NULL;
714
#endif
715
        return peer;
716

    
717
}
718

    
719
int pidfile_remove(char *path, int fd)
720
{
721
        close(fd);
722
        return (unlink(path));
723
}
724

    
725
int pidfile_write(int pid_fd)
726
{
727
        char buf[16];
728
        snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
729
        buf[15] = 0;
730

    
731
        lseek(pid_fd, 0, SEEK_SET);
732
        int ret = write(pid_fd, buf, strlen(buf));
733
        return ret;
734
}
735

    
736
int pidfile_read(char *path, pid_t *pid)
737
{
738
        char buf[16], *endptr;
739
        *pid = 0;
740

    
741
        int fd = open(path, O_RDONLY);
742
        if (fd < 0)
743
                return -1;
744
        int ret = read(fd, buf, 15);
745
        buf[15]=0;
746
        close(fd);
747
        if (ret < 0)
748
                return -1;
749
        else{
750
                *pid = strtol(buf, &endptr, 10);
751
                if (endptr != &buf[ret]){
752
                        *pid = 0;
753
                        return -1;
754
                }
755
        }
756
        return 0;
757
}
758

    
759
int pidfile_open(char *path, pid_t *old_pid)
760
{
761
        //nfs version > 3
762
        int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
763
        if (fd < 0){
764
                if (errno == EEXIST)
765
                        pidfile_read(path, old_pid);
766
        }
767
        return fd;
768
}
769

    
770
void usage(char *argv0)
771
{
772
        fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
773
        fprintf(stderr, "General peer options:\n"
774
                "  Option      | Default | \n"
775
                "  --------------------------------------------\n"
776
                "    -g        | None    | Segment spec to join\n"
777
                "    -sp       | NoPort  | Start portno to bind\n"
778
                "    -ep       | NoPort  | End portno to bind\n"
779
                "    -p        | NoPort  | Portno to bind\n"
780
                "    -n        | 16      | Number of ops\n"
781
                "    -v        | 0       | Verbosity level\n"
782
                "    -l        | None    | Logfile \n"
783
                "    -d        | No      | Daemonize \n"
784
                "    --pidfile | None    | Pidfile \n"
785
#ifdef MT
786
                "    -t        | No      | Number of threads \n"
787
#endif
788
                "\n"
789
               );
790
        custom_peer_usage();
791
}
792

    
793
int main(int argc, char *argv[])
794
{
795
        struct peerd *peer = NULL;
796
        //parse args
797
        int r;
798
        long portno_start = -1, portno_end = -1, portno = -1;
799

    
800
        //set defaults here
801
        int daemonize = 0, help = 0;
802
        uint32_t nr_ops = 16;
803
        uint32_t nr_threads = 1;
804
        unsigned int debug_level = 0;
805
        xport defer_portno = NoPort;
806
        pid_t old_pid;
807
        int pid_fd = -1;
808

    
809
        char spec[MAX_SPEC_LEN + 1];
810
        char logfile[MAX_LOGFILE_LEN + 1];
811
        char pidfile[MAX_PIDFILE_LEN + 1];
812

    
813
        logfile[0] = 0;
814
        pidfile[0] = 0;
815
        spec[0] = 0;
816

    
817
        //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
818
        // -dp xseg_portno to defer blocking requests
819
        // -l log file ?
820
        //TODO print messages on arg parsing error
821
        BEGIN_READ_ARGS(argc, argv);
822
        READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
823
        READ_ARG_ULONG("-sp", portno_start);
824
        READ_ARG_ULONG("-ep", portno_end);
825
        READ_ARG_ULONG("-p", portno);
826
        READ_ARG_ULONG("-n", nr_ops);
827
        READ_ARG_ULONG("-v", debug_level);
828
#ifdef MT
829
        READ_ARG_ULONG("-t", nr_threads);
830
#endif
831
        READ_ARG_ULONG("-dp", defer_portno);
832
        READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
833
        READ_ARG_BOOL("-d", daemonize);
834
        READ_ARG_BOOL("-h", help);
835
        READ_ARG_BOOL("--help", help);
836
        READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
837
        END_READ_ARGS();
838

    
839
        if (help){
840
                usage(argv[0]);
841
                return 0;
842
        }
843

    
844
        r = init_logctx(&lc, argv[0], debug_level, logfile,
845
                        REDIRECT_STDOUT|REDIRECT_STDERR);
846
        if (r < 0){
847
                XSEGLOG("Cannot initialize logging to logfile");
848
                return -1;
849
        }
850
        XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
851

    
852
        if (pidfile[0]){
853
                pid_fd = pidfile_open(pidfile, &old_pid);
854
                if (pid_fd < 0) {
855
                        if (old_pid) {
856
                                XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
857
                        } else {
858
                                XSEGLOG2(&lc, E, "Cannot open or create pidfile");
859
                        }
860
                        return -1;
861
                }
862
        }
863

    
864
        if (daemonize){
865
                if (daemon(0, 1) < 0){
866
                        XSEGLOG2(&lc, E, "Cannot daemonize");
867
                        r = -1;
868
                        goto out;
869
                }
870
        }
871

    
872
        pidfile_write(pid_fd);
873

    
874
        //TODO perform argument sanity checks
875
        verbose = debug_level;
876
        if (portno != -1) {
877
                portno_start = portno;
878
                portno_end = portno;
879
        }
880
        if (portno_start == -1 || portno_end == -1){
881
                XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
882
                usage(argv[0]);
883
                r = -1;
884
                goto out;
885
        }
886

    
887
        peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
888
        if (!peer){
889
                r = -1;
890
                goto out;
891
        }
892
        setup_signals(peer);
893
        r = custom_peer_init(peer, argc, argv);
894
        if (r < 0)
895
                goto out;
896
#if defined(MT)
897
        //TODO err check
898
        peerd_start_threads(peer);
899
#elif defined(ST_THREADS)
900
        st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
901
        r = st_thread_join(st, NULL);
902
#else
903
        r = init_peerd_loop(peer);
904
#endif
905
out:
906
        if (pid_fd > 0)
907
                pidfile_remove(pidfile, pid_fd);
908
        return r;
909
}