Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / peer.c @ 4b114b66

History | View | Annotate | Download (22.1 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#define _GNU_SOURCE
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <sys/types.h>
39
#include <unistd.h>
40
#include <sys/syscall.h>
41
#include <sys/time.h>
42
#include <signal.h>
43
#include <sys/stat.h>
44
#include <fcntl.h>
45
#include <errno.h>
46

    
47
#ifdef MT
48
#include <pthread.h>
49
#endif
50

    
51
#include <xseg/xseg.h>
52
#include <peer.h>
53

    
54
#ifdef MT
55
#define PEER_TYPE "pthread"
56
#else
57
#define PEER_TYPE "posix"
58
#endif
59

    
60
//FIXME this should not be defined here probably
61
#define MAX_SPEC_LEN 128
62
#define MAX_PIDFILE_LEN 512
63

    
64
volatile unsigned int terminated = 0;
65
unsigned int verbose = 0;
66
struct log_ctx lc;
67
#ifdef ST_THREADS
68
uint32_t ta = 0;
69
#endif
70

    
71
#ifdef MT
72
struct peerd *global_peer;
73

    
74
struct thread {
75
        struct peerd *peer;
76
        pthread_t tid;
77
        pthread_cond_t cond;
78
        pthread_mutex_t lock;
79
        int thread_no;
80
        void (*func)(void *arg);
81
        void *arg;
82
        void *priv;
83
};
84

    
85
inline static struct thread* alloc_thread(struct peerd *peer)
86
{
87
        xqindex idx = xq_pop_head(&peer->threads, 1);
88
        if (idx == Noneidx)
89
                return NULL;
90
        return peer->thread + idx;
91
}
92

    
93
inline static void free_thread(struct peerd *peer, struct thread *t)
94
{
95
        xqindex idx = t - peer->thread;
96
        xq_append_head(&peer->threads, idx, 1);
97
}
98

    
99

    
100
inline static void __wake_up_thread(struct thread *t)
101
{
102
        pthread_mutex_lock(&t->lock);
103
        pthread_cond_signal(&t->cond);
104
        pthread_mutex_unlock(&t->lock);
105
}
106

    
107
inline static void wake_up_thread(struct thread* t)
108
{
109
        if (t){
110
                __wake_up_thread(t);
111
        }
112
}
113

    
114
inline static int wake_up_next_thread(struct peerd *peer)
115
{
116
        return (xseg_signal(peer->xseg, peer->portno_start));
117
}
118
#endif
119

    
120
/*
121
 * extern is needed if this function is going to be called by another file
122
 * such as bench-xseg.c
123
 */
124

    
125
void signal_handler(int signal)
126
{
127
        XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
128
        terminated = 1;
129
#ifdef MT
130
        wake_up_next_thread(global_peer);
131
#endif
132
}
133

    
134
void renew_logfile(int signal)
135
{
136
        XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
137
        renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
138
}
139

    
140
static int setup_signals(struct peerd *peer)
141
{
142
        int r;
143
        struct sigaction sa;
144
#ifdef MT
145
        global_peer = peer;
146
#endif
147
        sigemptyset(&sa.sa_mask);
148
        sa.sa_flags = 0;
149
        sa.sa_handler = signal_handler;
150
        r = sigaction(SIGTERM, &sa, NULL);
151
        if (r < 0)
152
                return r;
153
        r = sigaction(SIGINT, &sa, NULL);
154
        if (r < 0)
155
                return r;
156
        r = sigaction(SIGQUIT, &sa, NULL);
157
        if (r < 0)
158
                return r;
159

    
160
        sa.sa_handler = renew_logfile;
161
        r = sigaction(SIGUSR1, &sa, NULL);
162
        if (r < 0)
163
                return r;
164

    
165
        return r;
166
}
167

    
168
inline int canDefer(struct peerd *peer)
169
{
170
        return !(peer->defer_portno == NoPort);
171
}
172

    
173
void print_req(struct xseg *xseg, struct xseg_request *req)
174
{
175
        char target[64], data[64];
176
        char *req_target, *req_data;
177
        unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
178
        req_target = xseg_get_target(xseg, req);
179
        req_data = xseg_get_data(xseg, req);
180

    
181
        if (1) {
182
                strncpy(target, req_target, end);
183
                target[end] = 0;
184
                strncpy(data, req_data, 63);
185
                data[63] = 0;
186
                printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
187
                                "src: %u, transit: %u, dst: %u effective dst: %u\n"
188
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
189
                                (unsigned long)(req),
190
                                (unsigned int)req->op,
191
                                (unsigned long long)req->offset,
192
                                (unsigned long)req->size,
193
                                (unsigned long)req->serviced,
194
                                (unsigned int)req->state,
195
                                (unsigned int)req->src_portno,
196
                                (unsigned int)req->transit_portno,
197
                                (unsigned int)req->dst_portno,
198
                                (unsigned int)req->effective_dst_portno,
199
                                (unsigned int)req->targetlen, target,
200
                                (unsigned long long)req->datalen, data);
201
        }
202
}
203
void log_pr(char *msg, struct peer_req *pr)
204
{
205
        char target[64], data[64];
206
        char *req_target, *req_data;
207
        struct peerd *peer = pr->peer;
208
        struct xseg *xseg = pr->peer->xseg;
209
        req_target = xseg_get_target(xseg, pr->req);
210
        req_data = xseg_get_data(xseg, pr->req);
211
        /* null terminate name in case of req->target is less than 63 characters,
212
         * and next character after name (aka first byte of next buffer) is not
213
         * null
214
         */
215
        unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
216
        if (verbose) {
217
                strncpy(target, req_target, end);
218
                target[end] = 0;
219
                strncpy(data, req_data, 63);
220
                data[63] = 0;
221
                printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
222
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
223
                                msg,
224
                                (unsigned int)(pr - peer->peer_reqs),
225
                                (unsigned int)pr->req->op,
226
                                (unsigned long long)pr->req->offset,
227
                                (unsigned long)pr->req->size,
228
                                (unsigned long)pr->req->serviced,
229
                                (unsigned long)pr->retval,
230
                                (unsigned int)pr->req->state,
231
                                (unsigned int)pr->req->targetlen, target,
232
                                (unsigned long long)pr->req->datalen, data);
233
        }
234
}
235

    
236
/*
237
 * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
238
 * queue. If a pointer from peer_reqs is popped, we are certain that the
239
 * associated memory in peer_reqs is free to use
240
 */
241
inline struct peer_req *alloc_peer_req(struct peerd *peer)
242
{
243
        xqindex idx = xq_pop_head(&peer->free_reqs, 1);
244
        if (idx == Noneidx)
245
                return NULL;
246
        return peer->peer_reqs + idx;
247
}
248

    
249
inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
250
{
251
        xqindex idx = pr - peer->peer_reqs;
252
        pr->req = NULL;
253
        xq_append_head(&peer->free_reqs, idx, 1);
254
}
255

    
256
struct timeval resp_start, resp_end, resp_accum = {0, 0};
257
uint64_t responds = 0;
258
void get_responds_stats(){
259
                printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
260
                                //(unsigned int)(t - peer->thread),
261
                                resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
262
}
263

    
264
//FIXME error check
265
void fail(struct peerd *peer, struct peer_req *pr)
266
{
267
        struct xseg_request *req = pr->req;
268
        uint32_t p;
269
        XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
270
        req->state |= XS_FAILED;
271
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
272
        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
273
        xseg_signal(peer->xseg, p);
274
        free_peer_req(peer, pr);
275
#ifdef MT
276
        wake_up_next_thread(peer);
277
#endif
278
}
279

    
280
//FIXME error check
281
void complete(struct peerd *peer, struct peer_req *pr)
282
{
283
        struct xseg_request *req = pr->req;
284
        uint32_t p;
285
        req->state |= XS_SERVED;
286
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
287
        //gettimeofday(&resp_start, NULL);
288
        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
289
        //gettimeofday(&resp_end, NULL);
290
        //responds++;
291
        //timersub(&resp_end, &resp_start, &resp_end);
292
        //timeradd(&resp_end, &resp_accum, &resp_accum);
293
        //printf("xseg_signal: %u\n", p);
294
        xseg_signal(peer->xseg, p);
295
        free_peer_req(peer, pr);
296
#ifdef MT
297
        wake_up_next_thread(peer);
298
#endif
299
}
300

    
301
static void handle_accepted(struct peerd *peer, struct peer_req *pr,
302
                                struct xseg_request *req)
303
{
304
        struct xseg_request *xreq = pr->req;
305
        //assert xreq == req;
306
        XSEGLOG2(&lc, D, "Handle accepted");
307
        xreq->serviced = 0;
308
        //xreq->state = XS_ACCEPTED;
309
        pr->retval = 0;
310
        dispatch(peer, pr, req, dispatch_accept);
311
}
312

    
313
static void handle_received(struct peerd *peer, struct peer_req *pr,
314
                                struct xseg_request *req)
315
{
316
        //struct xseg_request *req = pr->req;
317
        //assert req->state != XS_ACCEPTED;
318
        XSEGLOG2(&lc, D, "Handle received \n");
319
        dispatch(peer, pr, req, dispatch_receive);
320

    
321
}
322
struct timeval sub_start, sub_end, sub_accum = {0, 0};
323
uint64_t submits = 0;
324
void get_submits_stats(){
325
                printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
326
                                //(unsigned int)(t - peer->thread),
327
                                sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
328
}
329

    
330
int submit_peer_req(struct peerd *peer, struct peer_req *pr)
331
{
332
        uint32_t ret;
333
        struct xseg_request *req = pr->req;
334
        // assert req->portno == peer->portno ?
335
        //TODO small function with error checking
336
        XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
337
        ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
338
        if (ret < 0)
339
                return -1;
340
        //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
341
        //gettimeofday(&sub_start, NULL);
342
        ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
343
        //gettimeofday(&sub_end, NULL);
344
        //submits++;
345
        //timersub(&sub_end, &sub_start, &sub_end);
346
        //timeradd(&sub_end, &sub_accum, &sub_accum);
347
        if (ret == NoPort)
348
                return -1;
349
        xseg_signal(peer->xseg, ret);
350
        return 0;
351
}
352

    
353
int check_ports(struct peerd *peer)
354
{
355
        struct xseg *xseg = peer->xseg;
356
        xport portno_start = peer->portno_start;
357
        xport portno_end = peer->portno_end;
358
        struct xseg_request *accepted, *received;
359
        struct peer_req *pr;
360
        xport i;
361
        int  r, c = 0;
362

    
363
        for (i = portno_start; i <= portno_end; i++) {
364
                accepted = NULL;
365
                received = NULL;
366
                //Shouldn't we just leave?
367
                if (!isTerminate()) {
368
                        //Better way than alloc/free all the time?
369
                        //Cache the allocated peer_req?
370
                        pr = alloc_peer_req(peer);
371
                        if (pr) {
372
                                accepted = xseg_accept(xseg, i, X_NONBLOCK);
373
                                if (accepted) {
374
                                        pr->req = accepted;
375
                                        pr->portno = i;
376
                                        xseg_cancel_wait(xseg, i);
377
                                        handle_accepted(peer, pr, accepted);
378
                                        c = 1;
379
                                }
380
                                else {
381
                                        free_peer_req(peer, pr);
382
                                }
383
                        }
384
                }
385
                received = xseg_receive(xseg, i, X_NONBLOCK);
386
                if (received) {
387
                        r =  xseg_get_req_data(xseg, received, (void **) &pr);
388
                        if (r < 0 || !pr){
389
                                XSEGLOG2(&lc, W, "Received request with no pr data\n");
390
                                xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
391
                                if (p == NoPort){
392
                                        XSEGLOG2(&lc, W, "Could not respond stale request");
393
                                        xseg_put_request(xseg, received, portno_start);
394
                                        continue;
395
                                } else {
396
                                        xseg_signal(xseg, p);
397
                                }
398
                        } else {
399
                                //maybe perform sanity check for pr
400
                                xseg_cancel_wait(xseg, i);
401
                                handle_received(peer, pr, received);
402
                                c = 1;
403
                        }
404
                }
405
        }
406

    
407
        return c;
408
}
409

    
410
#ifdef MT
411
static void* thread_loop(void *arg)
412
{
413
        struct thread *t = (struct thread *) arg;
414
        struct peerd *peer = t->peer;
415
        struct xseg *xseg = peer->xseg;
416
        xport portno_start = peer->portno_start;
417
        xport portno_end = peer->portno_end;
418
        pid_t pid =syscall(SYS_gettid);
419
        uint64_t loops;
420
        uint64_t threshold=1000/(1 + portno_end - portno_start);
421

    
422
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
423
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
424
        xseg_init_local_signal(xseg, peer->portno_start);
425
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
426
                for(loops =  threshold; loops > 0; loops--) {
427
                        if (loops == 1)
428
                                xseg_prepare_wait(xseg, peer->portno_start);
429
                        if (check_ports(peer))
430
                                loops = threshold;
431
                }
432
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
433
                xseg_wait_signal(xseg, 10000000UL);
434
                xseg_cancel_wait(xseg, peer->portno_start);
435
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
436
        }
437
        wake_up_next_thread(peer);
438
        custom_peer_finalize(peer);
439
        return NULL;
440
}
441

    
442
void *init_thread_loop(void *arg)
443
{
444
        struct thread *t = (struct thread *) arg;
445
        struct peerd *peer = t->peer;
446
        char *thread_id;
447
        int i;
448

    
449
        /*
450
         * We need an identifier for every thread that will spin in peerd_loop.
451
         * The following code is a way to create a string of this format:
452
         *                "Thread <num>"
453
         * minus the null terminator. What we do is we create this string with
454
         * snprintf and then resize it to exclude the null terminator with
455
         * realloc. Finally, the result string is passed to the (void *arg) field
456
         * of struct thread.
457
         *
458
         * Since the highest thread number can't be more than 5 digits, using 13
459
         * chars should be more than enough.
460
         */
461
        thread_id = malloc(13 * sizeof(char));
462
        snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
463
        for (i = 0; thread_id[i]; i++) {}
464
        t->arg = (void *)realloc(thread_id, i-1);
465

    
466
        //Start thread loop
467
        (void)peer->peerd_loop(t);
468

    
469
        wake_up_next_thread(peer);
470
        custom_peer_finalize(peer);
471

    
472
        return NULL;
473
}
474

    
475
int peerd_start_threads(struct peerd *peer)
476
{
477
        int i;
478
        uint32_t nr_threads = peer->nr_threads;
479
        //TODO err check
480
        for (i = 0; i < nr_threads; i++) {
481
                peer->thread[i].func = NULL;
482
                peer->thread[i].arg = NULL;
483
                peer->thread[i].peer = peer;
484
                pthread_cond_init(&peer->thread[i].cond,NULL);
485
                pthread_mutex_init(&peer->thread[i].lock, NULL);
486
                pthread_create(&peer->thread[i].tid, NULL,
487
                                        init_thread_loop, (void *)(peer->thread + i));
488
        }
489

    
490
        if (peer->interactive_func)
491
                peer->interactive_func();
492
        for (i = 0; i < nr_threads; i++) {
493
                pthread_join(peer->thread[i].tid, NULL);
494
        }
495

    
496
        return 0;
497
}
498
#endif
499

    
500
int defer_request(struct peerd *peer, struct peer_req *pr)
501
{
502
        int r;
503
        xport p;
504
        if (!canDefer(peer)){
505
                XSEGLOG2(&lc, E, "Peer cannot defer requests");
506
                return -1;
507
        }
508
        p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
509
                        X_ALLOC);
510
        if (p == NoPort){
511
                XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
512
                return -1;
513
        }
514
        r = xseg_signal(peer->xseg, p);
515
        if (r < 0) {
516
                XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
517
        }
518
        free_peer_req(peer, pr);
519
        return 0;
520
}
521

    
522
/*
523
 * generic_peerd_loop is a general-purpose port-checker loop that is
524
 * suitable both for multi-threaded and single-threaded peers.
525
 */
526
static int generic_peerd_loop(void *arg)
527
{
528
#ifdef MT
529
        struct thread *t = (struct thread *) arg;
530
        struct peerd *peer = t->peer;
531
        char *id = t->arg;
532
#else
533
        struct peerd *peer = (struct peerd *) arg;
534
        char id[4] = {'P','e','e','r'};
535
#endif
536
        struct xseg *xseg = peer->xseg;
537
        xport portno_start = peer->portno_start;
538
        xport portno_end = peer->portno_end;
539
        pid_t pid = syscall(SYS_gettid);
540
        uint64_t threshold=1000/(1 + portno_end - portno_start);
541
        uint64_t loops;
542

    
543
        XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
544
        xseg_init_local_signal(xseg, peer->portno_start);
545
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
546
#ifdef MT
547
                if (t->func) {
548
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
549
                        xseg_cancel_wait(xseg, peer->portno_start);
550
                        t->func(t->arg);
551
                        t->func = NULL;
552
                        t->arg = NULL;
553
                        continue;
554
                }
555
#endif
556
                //Heart of peerd_loop. This loop is common for everyone.
557
                for(loops = threshold; loops > 0; loops--) {
558
                        if (check_ports(peer))
559
                                loops = threshold;
560
                }
561
                xseg_prepare_wait(xseg, peer->portno_start);
562
#ifdef ST_THREADS
563
                if (ta){
564
                        st_sleep(0);
565
                        continue;
566
                }
567
#endif
568
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
569
                xseg_wait_signal(xseg, 10000000UL);
570
                xseg_cancel_wait(xseg, peer->portno_start);
571
                XSEGLOG2(&lc, I, "%s woke up\n", id);
572
        }
573
        return 0;
574
}
575

    
576
static int init_peerd_loop(struct peerd *peer)
577
{
578
        struct xseg *xseg = peer->xseg;
579

    
580
        peer->peerd_loop(peer);
581
        custom_peer_finalize(peer);
582
        xseg_quit_local_signal(xseg, peer->portno_start);
583

    
584
        return 0;
585
}
586

    
587
static struct xseg *join(char *spec)
588
{
589
        struct xseg_config config;
590
        struct xseg *xseg;
591

    
592
        (void)xseg_parse_spec(spec, &config);
593
        xseg = xseg_join(config.type, config.name, PEER_TYPE, NULL);
594
        if (xseg)
595
                return xseg;
596

    
597
        (void)xseg_create(&config);
598
        return xseg_join(config.type, config.name, PEER_TYPE, NULL);
599
}
600

    
601
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
602
                        long portno_end, uint32_t nr_threads, xport defer_portno)
603
{
604
        int i;
605
        struct peerd *peer;
606
        struct xseg_port *port;
607
        void *sd = NULL;
608
        xport p;
609

    
610
#ifdef ST_THREADS
611
        st_init();
612
#endif
613
        peer = malloc(sizeof(struct peerd));
614
        if (!peer) {
615
                perror("malloc");
616
                return NULL;
617
        }
618
        peer->nr_ops = nr_ops;
619
        peer->defer_portno = defer_portno;
620
#ifdef MT
621
        peer->nr_threads = nr_threads;
622
        peer->thread = calloc(nr_threads, sizeof(struct thread));
623
        if (!peer->thread)
624
                goto malloc_fail;
625
#endif
626
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
627
        if (!peer->peer_reqs){
628
malloc_fail:
629
                perror("malloc");
630
                return NULL;
631
        }
632

    
633
        if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
634
                goto malloc_fail;
635
#ifdef MT
636
        if (!xq_alloc_empty(&peer->threads, nr_threads))
637
                goto malloc_fail;
638
#endif
639
        if (xseg_initialize()){
640
                printf("cannot initialize library\n");
641
                return NULL;
642
        }
643
        peer->xseg = join(spec);
644
        if (!peer->xseg)
645
                return NULL;
646

    
647
        peer->portno_start = (xport) portno_start;
648
        peer->portno_end= (xport) portno_end;
649

    
650
        /*
651
         * Start binding ports from portno_start to portno_end.
652
         * The first port we bind will have its signal_desc initialized by xseg
653
         * and the same signal_desc will be used for all the other ports.
654
         */
655
        for (p = peer->portno_start; p <= peer->portno_end; p++) {
656
                port = xseg_bind_port(peer->xseg, p, sd);
657
                if (!port){
658
                        printf("cannot bind to port %u\n", (unsigned int) p);
659
                        return NULL;
660
                }
661
                if (p == peer->portno_start)
662
                        sd = xseg_get_signal_desc(peer->xseg, port);
663
        }
664

    
665
        printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
666

    
667
        for (i = 0; i < nr_ops; i++) {
668
                peer->peer_reqs[i].peer = peer;
669
                peer->peer_reqs[i].req = NULL;
670
                peer->peer_reqs[i].retval = 0;
671
                peer->peer_reqs[i].priv = NULL;
672
                peer->peer_reqs[i].portno = NoPort;
673

    
674
        //Plug default peerd_loop. This can change later on by custom_peer_init.
675
        peer->peerd_loop = generic_peerd_loop;
676

    
677
#ifdef ST_THREADS
678
                peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
679
#endif
680
        }
681
#ifdef MT
682
        peer->interactive_func = NULL;
683
#endif
684
        return peer;
685
}
686

    
687
int pidfile_remove(char *path, int fd)
688
{
689
        close(fd);
690
        return (unlink(path));
691
}
692

    
693
int pidfile_write(int pid_fd)
694
{
695
        char buf[16];
696
        snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
697
        buf[15] = 0;
698

    
699
        lseek(pid_fd, 0, SEEK_SET);
700
        int ret = write(pid_fd, buf, strlen(buf));
701
        return ret;
702
}
703

    
704
int pidfile_read(char *path, pid_t *pid)
705
{
706
        char buf[16], *endptr;
707
        *pid = 0;
708

    
709
        int fd = open(path, O_RDONLY);
710
        if (fd < 0)
711
                return -1;
712
        int ret = read(fd, buf, 15);
713
        buf[15]=0;
714
        close(fd);
715
        if (ret < 0)
716
                return -1;
717
        else{
718
                *pid = strtol(buf, &endptr, 10);
719
                if (endptr != &buf[ret]){
720
                        *pid = 0;
721
                        return -1;
722
                }
723
        }
724
        return 0;
725
}
726

    
727
int pidfile_open(char *path, pid_t *old_pid)
728
{
729
        //nfs version > 3
730
        int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
731
        if (fd < 0){
732
                if (errno == EEXIST)
733
                        pidfile_read(path, old_pid);
734
        }
735
        return fd;
736
}
737

    
738
void usage(char *argv0)
739
{
740
        fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
741
        fprintf(stderr, "General peer options:\n"
742
                "  Option      | Default | \n"
743
                "  --------------------------------------------\n"
744
                "    -g        | None    | Segment spec to join\n"
745
                "    -sp       | NoPort  | Start portno to bind\n"
746
                "    -ep       | NoPort  | End portno to bind\n"
747
                "    -p        | NoPort  | Portno to bind\n"
748
                "    -n        | 16      | Number of ops\n"
749
                "    -v        | 0       | Verbosity level\n"
750
                "    -l        | None    | Logfile \n"
751
                "    -d        | No      | Daemonize \n"
752
                "    --pidfile | None    | Pidfile \n"
753
#ifdef MT
754
                "    -t        | No      | Number of threads \n"
755
#endif
756
                "\n"
757
               );
758
        custom_peer_usage();
759
}
760

    
761
int main(int argc, char *argv[])
762
{
763
        struct peerd *peer = NULL;
764
        //parse args
765
        int r;
766
        long portno_start = -1, portno_end = -1, portno = -1;
767

    
768
        //set defaults here
769
        int daemonize = 0, help = 0;
770
        uint32_t nr_ops = 16;
771
        uint32_t nr_threads = 1;
772
        unsigned int debug_level = 0;
773
        xport defer_portno = NoPort;
774
        pid_t old_pid;
775
        int pid_fd = -1;
776

    
777
        char spec[MAX_SPEC_LEN + 1];
778
        char logfile[MAX_LOGFILE_LEN + 1];
779
        char pidfile[MAX_PIDFILE_LEN + 1];
780

    
781
        logfile[0] = 0;
782
        pidfile[0] = 0;
783
        spec[0] = 0;
784

    
785
        //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
786
        // -dp xseg_portno to defer blocking requests
787
        // -l log file ?
788
        //TODO print messages on arg parsing error
789
        BEGIN_READ_ARGS(argc, argv);
790
        READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
791
        READ_ARG_ULONG("-sp", portno_start);
792
        READ_ARG_ULONG("-ep", portno_end);
793
        READ_ARG_ULONG("-p", portno);
794
        READ_ARG_ULONG("-n", nr_ops);
795
        READ_ARG_ULONG("-v", debug_level);
796
#ifdef MT
797
        READ_ARG_ULONG("-t", nr_threads);
798
#endif
799
        READ_ARG_ULONG("-dp", defer_portno);
800
        READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
801
        READ_ARG_BOOL("-d", daemonize);
802
        READ_ARG_BOOL("-h", help);
803
        READ_ARG_BOOL("--help", help);
804
        READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
805
        END_READ_ARGS();
806

    
807
        if (help){
808
                usage(argv[0]);
809
                return 0;
810
        }
811

    
812
        r = init_logctx(&lc, argv[0], debug_level, logfile,
813
                        REDIRECT_STDOUT|REDIRECT_STDERR);
814
        if (r < 0){
815
                XSEGLOG("Cannot initialize logging to logfile");
816
                return -1;
817
        }
818
        XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
819

    
820
        if (pidfile[0]){
821
                pid_fd = pidfile_open(pidfile, &old_pid);
822
                if (pid_fd < 0) {
823
                        if (old_pid) {
824
                                XSEGLOG2(&lc, E, "Daemon already running, pid: %d.", old_pid);
825
                        } else {
826
                                XSEGLOG2(&lc, E, "Cannot open or create pidfile");
827
                        }
828
                        return -1;
829
                }
830
        }
831

    
832
        if (daemonize){
833
                if (daemon(0, 1) < 0){
834
                        XSEGLOG2(&lc, E, "Cannot daemonize");
835
                        r = -1;
836
                        goto out;
837
                }
838
        }
839

    
840
        pidfile_write(pid_fd);
841

    
842
        //TODO perform argument sanity checks
843
        verbose = debug_level;
844
        if (portno != -1) {
845
                portno_start = portno;
846
                portno_end = portno;
847
        }
848
        if (portno_start == -1 || portno_end == -1){
849
                XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
850
                usage(argv[0]);
851
                r = -1;
852
                goto out;
853
        }
854

    
855
        peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
856
        if (!peer){
857
                r = -1;
858
                goto out;
859
        }
860
        setup_signals(peer);
861
        r = custom_peer_init(peer, argc, argv);
862
        if (r < 0)
863
                goto out;
864
#if defined(MT)
865
        //TODO err check
866
        peerd_start_threads(peer);
867
#elif defined(ST_THREADS)
868
        st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
869
        r = st_thread_join(st, NULL);
870
#else
871
        r = init_peerd_loop(peer);
872
#endif
873
out:
874
        if (pid_fd > 0)
875
                pidfile_remove(pidfile, pid_fd);
876
        return r;
877
}