Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / peer.c @ de4c09bb

History | View | Annotate | Download (21.8 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <sys/types.h>
39
#include <unistd.h>
40
#include <sys/syscall.h>
41
#include <sys/time.h>
42
#include <signal.h>
43
#include <sys/stat.h>
44
#include <fcntl.h>
45
#include <errno.h>
46

    
47
#ifdef MT
48
#include <pthread.h>
49
#endif
50

    
51
#include <xseg/xseg.h>
52
#include <peer.h>
53

    
54
#ifdef MT
55
#define PEER_TYPE "pthread"
56
#else
57
#define PEER_TYPE "posix"
58
#endif
59

    
60
//FIXME this should not be defined here probably
61
#define MAX_SPEC_LEN 128
62
#define MAX_PIDFILE_LEN 512
63

    
64
volatile unsigned int terminated = 0;
65
unsigned int verbose = 0;
66
struct log_ctx lc;
67
#ifdef ST_THREADS
68
uint32_t ta = 0;
69
#endif
70

    
71
#ifdef MT
72
struct peerd *global_peer;
73

    
74
struct thread {
75
        struct peerd *peer;
76
        pthread_t tid;
77
        pthread_cond_t cond;
78
        pthread_mutex_t lock;
79
        int thread_no;
80
        void (*func)(void *arg);
81
        void *arg;
82
};
83

    
84
inline static struct thread* alloc_thread(struct peerd *peer)
85
{
86
        xqindex idx = xq_pop_head(&peer->threads, 1);
87
        if (idx == Noneidx)
88
                return NULL;
89
        return peer->thread + idx;
90
}
91

    
92
inline static void free_thread(struct peerd *peer, struct thread *t)
93
{
94
        xqindex idx = t - peer->thread;
95
        xq_append_head(&peer->threads, idx, 1);
96
}
97

    
98

    
99
inline static void __wake_up_thread(struct thread *t)
100
{
101
        pthread_mutex_lock(&t->lock);
102
        pthread_cond_signal(&t->cond);
103
        pthread_mutex_unlock(&t->lock);
104
}
105

    
106
inline static void wake_up_thread(struct thread* t)
107
{
108
        if (t){
109
                __wake_up_thread(t);
110
        }
111
}
112

    
113
inline static int wake_up_next_thread(struct peerd *peer)
114
{
115
        return (xseg_signal(peer->xseg, peer->portno_start));
116
}
117
#endif
118

    
119
/*
120
 * extern is needed if this function is going to be called by another file
121
 * such as bench-xseg.c
122
 */
123

    
124
void signal_handler(int signal)
125
{
126
        XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
127
        terminated = 1;
128
#ifdef MT
129
        wake_up_next_thread(global_peer);
130
#endif
131
}
132

    
133
void renew_logfile(int signal)
134
{
135
        XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
136
        renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
137
}
138

    
139
static int setup_signals(struct peerd *peer)
140
{
141
        int r;
142
        struct sigaction sa;
143
#ifdef MT
144
        global_peer = peer;
145
#endif
146
        sigemptyset(&sa.sa_mask);
147
        sa.sa_flags = 0;
148
        sa.sa_handler = signal_handler;
149
        r = sigaction(SIGTERM, &sa, NULL);
150
        if (r < 0)
151
                return r;
152
        r = sigaction(SIGINT, &sa, NULL);
153
        if (r < 0)
154
                return r;
155
        r = sigaction(SIGQUIT, &sa, NULL);
156
        if (r < 0)
157
                return r;
158

    
159
        sa.sa_handler = renew_logfile;
160
        r = sigaction(SIGUSR1, &sa, NULL);
161
        if (r < 0)
162
                return r;
163

    
164
        return r;
165
}
166

    
167
inline int canDefer(struct peerd *peer)
168
{
169
        return !(peer->defer_portno == NoPort);
170
}
171

    
172
void print_req(struct xseg *xseg, struct xseg_request *req)
173
{
174
        char target[64], data[64];
175
        char *req_target, *req_data;
176
        unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
177
        req_target = xseg_get_target(xseg, req);
178
        req_data = xseg_get_data(xseg, req);
179

    
180
        if (1) {
181
                strncpy(target, req_target, end);
182
                target[end] = 0;
183
                strncpy(data, req_data, 63);
184
                data[63] = 0;
185
                printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
186
                                "src: %u, transit: %u, dst: %u effective dst: %u\n"
187
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
188
                                (unsigned long)(req),
189
                                (unsigned int)req->op,
190
                                (unsigned long long)req->offset,
191
                                (unsigned long)req->size,
192
                                (unsigned long)req->serviced,
193
                                (unsigned int)req->state,
194
                                (unsigned int)req->src_portno,
195
                                (unsigned int)req->transit_portno,
196
                                (unsigned int)req->dst_portno,
197
                                (unsigned int)req->effective_dst_portno,
198
                                (unsigned int)req->targetlen, target,
199
                                (unsigned long long)req->datalen, data);
200
        }
201
}
202
void log_pr(char *msg, struct peer_req *pr)
203
{
204
        char target[64], data[64];
205
        char *req_target, *req_data;
206
        struct peerd *peer = pr->peer;
207
        struct xseg *xseg = pr->peer->xseg;
208
        req_target = xseg_get_target(xseg, pr->req);
209
        req_data = xseg_get_data(xseg, pr->req);
210
        /* null terminate name in case of req->target is less than 63 characters,
211
         * and next character after name (aka first byte of next buffer) is not
212
         * null
213
         */
214
        unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
215
        if (verbose) {
216
                strncpy(target, req_target, end);
217
                target[end] = 0;
218
                strncpy(data, req_data, 63);
219
                data[63] = 0;
220
                printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
221
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
222
                                msg,
223
                                (unsigned int)(pr - peer->peer_reqs),
224
                                (unsigned int)pr->req->op,
225
                                (unsigned long long)pr->req->offset,
226
                                (unsigned long)pr->req->size,
227
                                (unsigned long)pr->req->serviced,
228
                                (unsigned long)pr->retval,
229
                                (unsigned int)pr->req->state,
230
                                (unsigned int)pr->req->targetlen, target,
231
                                (unsigned long long)pr->req->datalen, data);
232
        }
233
}
234

    
235
inline struct peer_req *alloc_peer_req(struct peerd *peer)
236
{
237
        xqindex idx = xq_pop_head(&peer->free_reqs, 1);
238
        if (idx == Noneidx)
239
                return NULL;
240
        return peer->peer_reqs + idx;
241
}
242

    
243
inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
244
{
245
        xqindex idx = pr - peer->peer_reqs;
246
        pr->req = NULL;
247
        xq_append_head(&peer->free_reqs, idx, 1);
248
}
249

    
250
struct timeval resp_start, resp_end, resp_accum = {0, 0};
251
uint64_t responds = 0;
252
void get_responds_stats(){
253
                printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
254
                                //(unsigned int)(t - peer->thread),
255
                                resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
256
}
257

    
258
//FIXME error check
259
void fail(struct peerd *peer, struct peer_req *pr)
260
{
261
        struct xseg_request *req = pr->req;
262
        uint32_t p;
263
        XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
264
        req->state |= XS_FAILED;
265
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
266
        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
267
        xseg_signal(peer->xseg, p);
268
        free_peer_req(peer, pr);
269
#ifdef MT
270
        wake_up_next_thread(peer);
271
#endif
272
}
273

    
274
//FIXME error check
275
void complete(struct peerd *peer, struct peer_req *pr)
276
{
277
        struct xseg_request *req = pr->req;
278
        uint32_t p;
279
        req->state |= XS_SERVED;
280
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
281
        //gettimeofday(&resp_start, NULL);
282
        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
283
        //gettimeofday(&resp_end, NULL);
284
        //responds++;
285
        //timersub(&resp_end, &resp_start, &resp_end);
286
        //timeradd(&resp_end, &resp_accum, &resp_accum);
287
        //printf("xseg_signal: %u\n", p);
288
        xseg_signal(peer->xseg, p);
289
        free_peer_req(peer, pr);
290
#ifdef MT
291
        wake_up_next_thread(peer);
292
#endif
293
}
294

    
295
static void handle_accepted(struct peerd *peer, struct peer_req *pr,
296
                                struct xseg_request *req)
297
{
298
        struct xseg_request *xreq = pr->req;
299
        //assert xreq == req;
300
        XSEGLOG2(&lc, D, "Handle accepted");
301
        xreq->serviced = 0;
302
        //xreq->state = XS_ACCEPTED;
303
        pr->retval = 0;
304
        dispatch(peer, pr, req, dispatch_accept);
305
}
306

    
307
static void handle_received(struct peerd *peer, struct peer_req *pr,
308
                                struct xseg_request *req)
309
{
310
        //struct xseg_request *req = pr->req;
311
        //assert req->state != XS_ACCEPTED;
312
        XSEGLOG2(&lc, D, "Handle received \n");
313
        dispatch(peer, pr, req, dispatch_receive);
314

    
315
}
316
struct timeval sub_start, sub_end, sub_accum = {0, 0};
317
uint64_t submits = 0;
318
void get_submits_stats(){
319
                printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
320
                                //(unsigned int)(t - peer->thread),
321
                                sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
322
}
323

    
324
int submit_peer_req(struct peerd *peer, struct peer_req *pr)
325
{
326
        uint32_t ret;
327
        struct xseg_request *req = pr->req;
328
        // assert req->portno == peer->portno ?
329
        //TODO small function with error checking
330
        XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
331
        ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
332
        if (ret < 0)
333
                return -1;
334
        //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
335
        //gettimeofday(&sub_start, NULL);
336
        ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
337
        //gettimeofday(&sub_end, NULL);
338
        //submits++;
339
        //timersub(&sub_end, &sub_start, &sub_end);
340
        //timeradd(&sub_end, &sub_accum, &sub_accum);
341
        if (ret == NoPort)
342
                return -1;
343
        xseg_signal(peer->xseg, ret);
344
        return 0;
345
}
346

    
347
int check_ports(struct peerd *peer)
348
{
349
        struct xseg *xseg = peer->xseg;
350
        xport portno_start = peer->portno_start;
351
        xport portno_end = peer->portno_end;
352
        struct xseg_request *accepted, *received;
353
        struct peer_req *pr;
354
        xport i;
355
        int  r, c = 0;
356

    
357
        for (i = portno_start; i <= portno_end; i++) {
358
                accepted = NULL;
359
                received = NULL;
360
                if (!isTerminate()) {
361
                        pr = alloc_peer_req(peer);
362
                        if (pr) {
363
                                accepted = xseg_accept(xseg, i, X_NONBLOCK);
364
                                if (accepted) {
365
                                        pr->req = accepted;
366
                                        pr->portno = i;
367
                                        xseg_cancel_wait(xseg, i);
368
                                        handle_accepted(peer, pr, accepted);
369
                                        c = 1;
370
                                }
371
                                else {
372
                                        free_peer_req(peer, pr);
373
                                }
374
                        }
375
                }
376
                received = xseg_receive(xseg, i, X_NONBLOCK);
377
                if (received) {
378
                        r =  xseg_get_req_data(xseg, received, (void **) &pr);
379
                        if (r < 0 || !pr){
380
                                XSEGLOG2(&lc, W, "Received request with no pr data\n");
381
                                xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
382
                                if (p == NoPort){
383
                                        XSEGLOG2(&lc, W, "Could not respond stale request");
384
                                        xseg_put_request(xseg, received, portno_start);
385
                                        continue;
386
                                } else {
387
                                        xseg_signal(xseg, p);
388
                                }
389
                        } else {
390
                                //maybe perform sanity check for pr
391
                                xseg_cancel_wait(xseg, i);
392
                                handle_received(peer, pr, received);
393
                                c = 1;
394
                        }
395
                }
396
        }
397

    
398
        return c;
399
}
400

    
401
#ifdef MT
402
static void* thread_loop(void *arg)
403
{
404
        struct thread *t = (struct thread *) arg;
405
        struct peerd *peer = t->peer;
406
        struct xseg *xseg = peer->xseg;
407
        xport portno_start = peer->portno_start;
408
        xport portno_end = peer->portno_end;
409
        pid_t pid =syscall(SYS_gettid);
410
        uint64_t loops;
411
        uint64_t threshold=1000/(1 + portno_end - portno_start);
412

    
413
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
414
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
415
        xseg_init_local_signal(xseg, peer->portno_start);
416
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
417
                for(loops =  threshold; loops > 0; loops--) {
418
                        if (loops == 1)
419
                                xseg_prepare_wait(xseg, peer->portno_start);
420
                        if (check_ports(peer))
421
                                loops = threshold;
422
                }
423
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
424
                xseg_wait_signal(xseg, 10000000UL);
425
                xseg_cancel_wait(xseg, peer->portno_start);
426
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
427
        }
428
        wake_up_next_thread(peer);
429
        custom_peer_finalize(peer);
430
        return NULL;
431
}
432

    
433
void *init_thread_loop(void *arg)
434
{
435
        struct thread *t = (struct thread *) arg;
436
        struct peerd *peer = t->peer;
437
        char *thread_id;
438
        int i;
439

    
440
        /*
441
         * We need an identifier for every thread that will spin in peerd_loop.
442
         * The following code is a way to create a string of this format:
443
         *                "Thread <num>"
444
         * minus the null terminator. What we do is we create this string with
445
         * snprintf and then resize it to exclude the null terminator with
446
         * realloc. Finally, the result string is passed to the (void *arg) field
447
         * of struct thread.
448
         *
449
         * Since the highest thread number can't be more than 5 digits, using 13
450
         * chars should be more than enough.
451
         */
452
        thread_id = malloc(13 * sizeof(char));
453
        snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
454
        for (i = 0; thread_id[i]; i++) {}
455
        t->arg = (void *)realloc(thread_id, i-1);
456

    
457
        //Start thread loop
458
        (void)peer->peerd_loop(t);
459

    
460
        wake_up_next_thread(peer);
461
        custom_peer_finalize(peer);
462

    
463
        return NULL;
464
}
465

    
466
int peerd_start_threads(struct peerd *peer)
467
{
468
        int i;
469
        uint32_t nr_threads = peer->nr_threads;
470
        //TODO err check
471
        for (i = 0; i < nr_threads; i++) {
472
                peer->thread[i].func = NULL;
473
                peer->thread[i].arg = NULL;
474
                peer->thread[i].peer = peer;
475
                pthread_cond_init(&peer->thread[i].cond,NULL);
476
                pthread_mutex_init(&peer->thread[i].lock, NULL);
477
                pthread_create(&peer->thread[i].tid, NULL,
478
                                        init_thread_loop, (void *)(peer->thread + i));
479
        }
480

    
481
        if (peer->interactive_func)
482
                peer->interactive_func();
483
        for (i = 0; i < nr_threads; i++) {
484
                pthread_join(peer->thread[i].tid, NULL);
485
        }
486

    
487
        return 0;
488
}
489
#endif
490

    
491
int defer_request(struct peerd *peer, struct peer_req *pr)
492
{
493
        int r;
494
        xport p;
495
        if (!canDefer(peer)){
496
                XSEGLOG2(&lc, E, "Peer cannot defer requests");
497
                return -1;
498
        }
499
        p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
500
                        X_ALLOC);
501
        if (p == NoPort){
502
                XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
503
                return -1;
504
        }
505
        r = xseg_signal(peer->xseg, p);
506
        if (r < 0) {
507
                XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
508
        }
509
        free_peer_req(peer, pr);
510
        return 0;
511
}
512

    
513
/*
514
 * generic_peerd_loop is a general-purpose port-checker loop that is
515
 * suitable both for multi-threaded and single-threaded peers.
516
 */
517
static int generic_peerd_loop(void *arg)
518
{
519
#ifdef MT
520
        struct thread *t = (struct thread *) arg;
521
        struct peerd *peer = t->peer;
522
        char *id = t->arg;
523
#else
524
        struct peerd *peer = (struct peerd *) arg;
525
        char id[4] = {'P','e','e','r'};
526
#endif
527
        struct xseg *xseg = peer->xseg;
528
        xport portno_start = peer->portno_start;
529
        xport portno_end = peer->portno_end;
530
        pid_t pid = syscall(SYS_gettid);
531
        uint64_t threshold=1000/(1 + portno_end - portno_start);
532
        uint64_t loops;
533

    
534
        XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
535
        xseg_init_local_signal(xseg, peer->portno_start);
536
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
537
#ifdef MT
538
                if (t->func) {
539
                        XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
540
                        xseg_cancel_wait(xseg, peer->portno_start);
541
                        t->func(t->arg);
542
                        t->func = NULL;
543
                        t->arg = NULL;
544
                        continue;
545
                }
546
#endif
547
                //Heart of peerd_loop. This loop is common for everyone.
548
                for(loops = threshold; loops > 0; loops--) {
549
                        if (check_ports(peer))
550
                                loops = threshold;
551
                }
552
                xseg_prepare_wait(xseg, peer->portno_start);
553
#ifdef ST_THREADS
554
                if (ta){
555
                        st_sleep(0);
556
                        continue;
557
                }
558
#endif
559
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
560
                xseg_wait_signal(xseg, 10000000UL);
561
                xseg_cancel_wait(xseg, peer->portno_start);
562
                XSEGLOG2(&lc, I, "%s woke up\n", id);
563
        }
564
        return 0;
565
}
566

    
567
static int init_peerd_loop(struct peerd *peer)
568
{
569
        struct xseg *xseg = peer->xseg;
570

    
571
        peer->peerd_loop(peer);
572
        custom_peer_finalize(peer);
573
        xseg_quit_local_signal(xseg, peer->portno_start);
574

    
575
        return 0;
576
}
577

    
578
static struct xseg *join(char *spec)
579
{
580
        struct xseg_config config;
581
        struct xseg *xseg;
582

    
583
        (void)xseg_parse_spec(spec, &config);
584
        xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
585
        if (xseg)
586
                return xseg;
587

    
588
        (void)xseg_create(&config);
589
        return xseg_join(config.type, config.name, PEER_TYPE, NULL);
590
}
591

    
592
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
593
                        long portno_end, uint32_t nr_threads, xport defer_portno)
594
{
595
        int i;
596
        struct peerd *peer;
597
        struct xseg_port *port;
598

    
599
#ifdef ST_THREADS
600
        st_init();
601
#endif
602
        peer = malloc(sizeof(struct peerd));
603
        if (!peer) {
604
                perror("malloc");
605
                return NULL;
606
        }
607
        peer->nr_ops = nr_ops;
608
        peer->defer_portno = defer_portno;
609
#ifdef MT
610
        peer->nr_threads = nr_threads;
611
        peer->thread = calloc(nr_threads, sizeof(struct thread));
612
        if (!peer->thread)
613
                goto malloc_fail;
614
#endif
615
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
616
        if (!peer->peer_reqs){
617
malloc_fail:
618
                perror("malloc");
619
                return NULL;
620
        }
621

    
622
        if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
623
                goto malloc_fail;
624
#ifdef MT
625
        if (!xq_alloc_empty(&peer->threads, nr_threads))
626
                goto malloc_fail;
627
#endif
628
        if (xseg_initialize()){
629
                printf("cannot initialize library\n");
630
                return NULL;
631
        }
632
        peer->xseg = join(spec);
633
        if (!peer->xseg)
634
                return NULL;
635

    
636
        peer->portno_start = (xport) portno_start;
637
        peer->portno_end= (xport) portno_end;
638
        port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
639
        if (!port){
640
                printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
641
                return NULL;
642
        }
643

    
644
        xport p;
645
        for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
646
                struct xseg_port *tmp;
647
                tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
648
                if (!tmp){
649
                        printf("cannot bind to port %u\n", (unsigned int) p);
650
                        return NULL;
651
                }
652
        }
653

    
654
        printf("Peer on ports  %u-%u\n", peer->portno_start,
655
                        peer->portno_end);
656

    
657
        for (i = 0; i < nr_ops; i++) {
658
                peer->peer_reqs[i].peer = peer;
659
                peer->peer_reqs[i].req = NULL;
660
                peer->peer_reqs[i].retval = 0;
661
                peer->peer_reqs[i].priv = NULL;
662
                peer->peer_reqs[i].portno = NoPort;
663

    
664
        //Plug default peerd_loop. This can change later on by custom_peer_init.
665
        peer->peerd_loop = generic_peerd_loop;
666

    
667
#ifdef ST_THREADS
668
                peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
669
#endif
670
        }
671
#ifdef MT
672
        peer->interactive_func = NULL;
673
#endif
674
        return peer;
675
}
676

    
677
int pidfile_remove(char *path, int fd)
678
{
679
        close(fd);
680
        return (unlink(path));
681
}
682

    
683
int pidfile_write(int pid_fd)
684
{
685
        char buf[16];
686
        snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
687
        buf[15] = 0;
688

    
689
        lseek(pid_fd, 0, SEEK_SET);
690
        int ret = write(pid_fd, buf, strlen(buf));
691
        return ret;
692
}
693

    
694
int pidfile_read(char *path, pid_t *pid)
695
{
696
        char buf[16], *endptr;
697
        *pid = 0;
698

    
699
        int fd = open(path, O_RDONLY);
700
        if (fd < 0)
701
                return -1;
702
        int ret = read(fd, buf, 15);
703
        buf[15]=0;
704
        close(fd);
705
        if (ret < 0)
706
                return -1;
707
        else{
708
                *pid = strtol(buf, &endptr, 10);
709
                if (endptr != &buf[ret]){
710
                        *pid = 0;
711
                        return -1;
712
                }
713
        }
714
        return 0;
715
}
716

    
717
int pidfile_open(char *path, pid_t *old_pid)
718
{
719
        //nfs version > 3
720
        int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
721
        if (fd < 0){
722
                if (errno == EEXIST)
723
                        pidfile_read(path, old_pid);
724
        }
725
        return fd;
726
}
727

    
728
void usage(char *argv0)
729
{
730
        fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
731
        fprintf(stderr, "General peer options:\n"
732
                "  Option      | Default | \n"
733
                "  --------------------------------------------\n"
734
                "    -g        | None    | Segment spec to join\n"
735
                "    -sp       | NoPort  | Start portno to bind\n"
736
                "    -ep       | NoPort  | End portno to bind\n"
737
                "    -p        | NoPort  | Portno to bind\n"
738
                "    -n        | 16      | Number of ops\n"
739
                "    -v        | 0       | Verbosity level\n"
740
                "    -l        | None    | Logfile \n"
741
                "    -d        | No      | Daemonize \n"
742
                "    --pidfile | None    | Pidfile \n"
743
#ifdef MT
744
                "    -t        | No      | Number of threads \n"
745
#endif
746
                "\n"
747
               );
748
        custom_peer_usage();
749
}
750

    
751
int main(int argc, char *argv[])
752
{
753
        struct peerd *peer = NULL;
754
        //parse args
755
        int r;
756
        long portno_start = -1, portno_end = -1, portno = -1;
757

    
758
        //set defaults here
759
        int daemonize = 0, help = 0;
760
        uint32_t nr_ops = 16;
761
        uint32_t nr_threads = 1;
762
        unsigned int debug_level = 0;
763
        xport defer_portno = NoPort;
764
        pid_t old_pid;
765
        int pid_fd = -1;
766

    
767
        char spec[MAX_SPEC_LEN + 1];
768
        char logfile[MAX_LOGFILE_LEN + 1];
769
        char pidfile[MAX_PIDFILE_LEN + 1];
770

    
771
        logfile[0] = 0;
772
        pidfile[0] = 0;
773
        spec[0] = 0;
774

    
775
        //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
776
        // -dp xseg_portno to defer blocking requests
777
        // -l log file ?
778
        //TODO print messages on arg parsing error
779
        BEGIN_READ_ARGS(argc, argv);
780
        READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
781
        READ_ARG_ULONG("-sp", portno_start);
782
        READ_ARG_ULONG("-ep", portno_end);
783
        READ_ARG_ULONG("-p", portno);
784
        READ_ARG_ULONG("-n", nr_ops);
785
        READ_ARG_ULONG("-v", debug_level);
786
#ifdef MT
787
        READ_ARG_ULONG("-t", nr_threads);
788
#endif
789
        READ_ARG_ULONG("-dp", defer_portno);
790
        READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
791
        READ_ARG_BOOL("-d", daemonize);
792
        READ_ARG_BOOL("-h", help);
793
        READ_ARG_BOOL("--help", help);
794
        READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
795
        END_READ_ARGS();
796

    
797
        if (help){
798
                usage(argv[0]);
799
                return 0;
800
        }
801

    
802
        r = init_logctx(&lc, argv[0], debug_level, logfile,
803
                        REDIRECT_STDOUT|REDIRECT_STDERR);
804
        if (r < 0){
805
                XSEGLOG("Cannot initialize logging to logfile");
806
                return -1;
807
        }
808
        XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
809

    
810
        if (pidfile[0]){
811
                pid_fd = pidfile_open(pidfile, &old_pid);
812
                if (pid_fd < 0) {
813
                        if (old_pid) {
814
                                XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
815
                        } else {
816
                                XSEGLOG2(&lc, E, "Cannot open or create pidfile");
817
                        }
818
                        return -1;
819
                }
820
        }
821

    
822
        if (daemonize){
823
                if (daemon(0, 1) < 0){
824
                        XSEGLOG2(&lc, E, "Cannot daemonize");
825
                        r = -1;
826
                        goto out;
827
                }
828
        }
829

    
830
        pidfile_write(pid_fd);
831

    
832
        //TODO perform argument sanity checks
833
        verbose = debug_level;
834
        if (portno != -1) {
835
                portno_start = portno;
836
                portno_end = portno;
837
        }
838
        if (portno_start == -1 || portno_end == -1){
839
                XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
840
                usage(argv[0]);
841
                r = -1;
842
                goto out;
843
        }
844

    
845
        peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
846
        if (!peer){
847
                r = -1;
848
                goto out;
849
        }
850
        setup_signals(peer);
851
        r = custom_peer_init(peer, argc, argv);
852
        if (r < 0)
853
                goto out;
854
#if defined(MT)
855
        //TODO err check
856
        peerd_start_threads(peer);
857
#elif defined(ST_THREADS)
858
        st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
859
        r = st_thread_join(st, NULL);
860
#else
861
        r = init_peerd_loop(peer);
862
#endif
863
out:
864
        if (pid_fd > 0)
865
                pidfile_remove(pidfile, pid_fd);
866
        return r;
867
}