Revision 8107a952

b/xseg/peers/user/bench-xseg.c
32 32
 * or implied, of GRNET S.A.
33 33
 */
34 34

  
35
#define _GNU_SOURCE
35 36
#include <stdio.h>
37
#include <stdlib.h>
36 38
#include <unistd.h>
39
#include <sys/syscall.h>
37 40
#include <sys/types.h>
38 41
#include <pthread.h>
39 42
#include <xseg/xseg.h>
40 43
#include <peer.h>
41 44
#include <time.h>
42 45
#include <sys/util.h>
46
#include <signal.h>
43 47

  
44 48
struct timespec delay = {0, 4000000};
45 49

  
......
48 52
	uint64_t os; //Object size
49 53
	uint64_t bs; //Block size
50 54
	uint32_t iodepth; //Num of in-flight xseg reqs
55
	xport dst_port;
51 56
	uint8_t flags;
52 57
};
53 58

  
59
int custom_peerd_loop(struct peerd *peer);
60

  
54 61
#define MAX_ARG_LEN 10
55 62

  
56 63
void custom_peer_usage()
......
60 67
		"    -ts       | None    | Total I/O size\n"
61 68
		"    -os       | 4M      | Object size\n"
62 69
		"    -bs       | 4k      | Block size\n"
70
		"    -dp       | None    | Destination port\n"
63 71
		"    --iodepth | 1       | Number of in-flight I/O requests\n"
64 72
		"\n");
65 73
}
......
105 113
	char block_size[MAX_ARG_LEN + 1];
106 114
	struct xseg *xseg = peer->xseg;
107 115
	unsigned int xseg_page_size = 1 << xseg->config.page_shift;
116
	long dst_port = -1;
108 117

  
109 118
	total_size[0] = 0;
110 119
	block_size[0] = 0;
......
122 131
	READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
123 132
	READ_ARG_STRING("-bs", block_size, MAX_ARG_LEN);
124 133
	READ_ARG_ULONG("--iodepth", prefs->iodepth);
134
	READ_ARG_ULONG("-dp", dst_port);
125 135
	END_READ_ARGS();
126 136

  
137
	/*
138
	 *************************
139
	 * Check size parameters *
140
	 *************************
141
	 */
142

  
127 143
	//Block size (bs): Defaults to 4K.
128 144
	//It must be a number followed by one of these characters: [k|K|m|M|g|G].
129 145
	//If not, it will be considered as size in bytes.
......
175 191
		goto arg_fail;
176 192
	}
177 193

  
194
	/*
195
	 *************************
196
	 * Check port parameters *
197
	 *************************
198
	 */
199

  
200
	if (dst_port < 0){
201
		XSEGLOG2(&lc, E, "Destination port needs to be supplied\n");
202
		goto arg_fail;
203
	}
204

  
205
	prefs->dst_port = (xport) dst_port;
206

  
207
	/*
208
	 **************************
209
	 * Customize struct peerd *
210
	 **************************
211
	 */
212

  
213
	peer->custom_peerd_loop = custom_peerd_loop;
214
	peer->priv = (void *) prefs;
178 215
	return 0;
179 216

  
180 217
arg_fail:
......
183 220
	return -1;
184 221
}
185 222

  
223
/*
224
 * This function substitutes the default peerd_loop of peer.c.
225
 * This is achieved by passing to gcc a CUSTOM_LOOP macro definition which is
226
 * checked (ifdef) in peer.c
227
 */
228
int custom_peerd_loop(struct peerd *peer)
229
{
230
#ifdef MT
231
	int i;
232
	if (peer->interactive_func)
233
		peer->interactive_func();
234
	for (i = 0; i < peer->nr_threads; i++) {
235
		pthread_join(peer->thread[i].tid, NULL);
236
	}
237
#else
238
	struct xseg *xseg = peer->xseg;
239
	struct bench *prefs = peer->priv;
240

  
241
	xport portno_start = peer->portno_start;
242
	xport portno_end = peer->portno_end;
243
	uint64_t threshold=1000/(1 + portno_end - portno_start);
244
	pid_t pid =syscall(SYS_gettid);
245
	uint64_t loops;
246

  
247
	uint64_t remaining = prefs->ts;
248

  
249
	XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
250
	xseg_init_local_signal(xseg, peer->portno_start);
251

  
252
	while (!isTerminate()
253
			&& xq_count(&peer->free_reqs) == peer->nr_ops
254
			&& remaining) {
255
		for (loops= threshold; loops > 0; loops--) {
256
			if (loops == 1)
257
				xseg_prepare_wait(xseg, peer->portno_start);
258
			if (check_ports(peer))
259
				loops = threshold;
260
		}
261
#ifdef ST_THREADS
262
		if (ta){
263
			st_sleep(0);
264
			continue;
265
		}
266
#endif
267
		XSEGLOG2(&lc, I, "Peer goes to sleep\n");
268
		xseg_wait_signal(xseg, 10000000UL);
269
		xseg_cancel_wait(xseg, peer->portno_start);
270
		XSEGLOG2(&lc, I, "Peer woke up\n");
271
	}
272
	custom_peer_finalize(peer);
273
	xseg_quit_local_signal(xseg, peer->portno_start);
274
#endif
275
	return 0;
276
}
277

  
186 278
void custom_peer_finalize(struct peerd *peer)
187 279
{
188 280
	return;
b/xseg/peers/user/peer.c
80 80
	void *arg;
81 81
};
82 82

  
83

  
84 83
inline static struct thread* alloc_thread(struct peerd *peer)
85 84
{
86 85
	xqindex idx = xq_pop_head(&peer->threads, 1);
......
116 115
}
117 116
#endif
118 117

  
119

  
120
static inline int isTerminate()
118
/*
119
 * extern is needed if this function is going to be called by another file
120
 * such as bench-xseg.c
121
 */
122
inline extern int isTerminate()
121 123
{
122 124
/* ta doesn't need to be taken into account, because the main loops
123 125
 * doesn't check the terminated flag if ta is not 0.
......
355 357
	return 0;
356 358
}
357 359

  
358
static int check_ports(struct peerd *peer)
360
int check_ports(struct peerd *peer)
359 361
{
360 362
	struct xseg *xseg = peer->xseg;
361 363
	xport portno_start = peer->portno_start;
......
439 441
	XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
440 442
	xseg_init_local_signal(xseg, peer->portno_start);
441 443
	for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
444
		XSEGLOG("Head of loop.\n");
442 445
		if (t->func) {
443 446
			XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
444 447
			xseg_cancel_wait(xseg, peer->portno_start);
......
448 451
			continue;
449 452
		}
450 453

  
451
		for(loops= threshold; loops > 0; loops--) {
454
		for(loops =  threshold; loops > 0; loops--) {
452 455
			if (loops == 1)
453 456
				xseg_prepare_wait(xseg, peer->portno_start);
454 457
			if (check_ports(peer))
......
505 508
	return 0;
506 509
}
507 510

  
508
static int peerd_loop(struct peerd *peer) 
511
static int peerd_loop(struct peerd *peer)
509 512
{
510 513
#ifdef MT
511 514
	int i;
......
605 608
		return NULL;
606 609
	}
607 610
	peer->xseg = join(spec);
608
	if (!peer->xseg) 
611
	if (!peer->xseg)
609 612
		return NULL;
610 613

  
611 614
	peer->portno_start = (xport) portno_start;
......
831 834
	st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
832 835
	r = st_thread_join(st, NULL);
833 836
#else
834
	r = peerd_loop(peer);
837
	if (peer->custom_peerd_loop)
838
		r = peer->custom_peerd_loop(peer);
839
	else
840
		r = peerd_loop(peer);
835 841
#endif
836 842
out:
837 843
	if (pid_fd > 0)
b/xseg/peers/user/peer.h
93 93
	xport defer_portno;
94 94
	struct peer_req *peer_reqs;
95 95
	struct xq free_reqs;
96
	int (*custom_peerd_loop)(struct peerd *peer);
96 97
	void *priv;
97 98
#ifdef MT
98 99
	uint32_t nr_threads;
......
121 122
void get_responds_stats();
122 123
void usage();
123 124
void print_req(struct xseg *xseg, struct xseg_request *req);
125
int isTerminate();
126
int check_ports(struct peerd *peer);
127

  
124 128
#ifdef MT
125 129
int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg);
126 130
#endif

Also available in: Unified diff