Revision d2d79e63 xseg/peers/user/cached.c

b/xseg/peers/user/cached.c
42 42
#include <xtypes/xlock.h>
43 43
#include <xtypes/xq.h>
44 44
#include <xtypes/xhash.h>
45
#include <xtypes/xworkq.h>
46
#include <xtypes/xwaitq.h>
45 47
#include <xseg/protocol.h>
48
#include <xtypes/xcache.h>
46 49

  
50

  
51
/* bucket states */
47 52
#define INVALID 0
48 53
#define LOADING 1
49 54
#define VALID   2
50 55
#define DIRTY   3
56
#define WRITING 4
57

  
58
#define bucket_readable(__status) \
59
	(__status == VALID || __status == DIRTY || __status == WRITING)
51 60

  
61
/* write policies */
52 62
#define WRITETHROUGH 1
53 63
#define WRITEBACK    2
54 64

  
55
struct cache_entry {
56
	xlock lock;
57
	uint32_t ref;
58
	unsigned char *data;
59
	char name[XSEG_MAX_TARGETLEN + 1];
60
	uint32_t *status //len: object_size/bucket_size
61
};
65

  
66
/* object states */
67
#define INVALIDATED (1 << 0)
68

  
69

  
70
/* cio states */
71
#define CIO_FAILED	1
72
#define CIO_ACCEPTED	2
73
#define CIO_READING	3
74

  
75
#define BUCKET_SIZE_QUANTUM 4096
62 76

  
63 77
struct cache_io {
78
	uint32_t state;
79
	xcache_handler h;
80
	uint32_t pending_reqs;
81
	struct work work;
64 82
};
65 83

  
66
struct cache {
67
	xlock lock;
84
struct cached {
85
	xport bportno;
68 86
	uint32_t cache_size;
87
	struct xcache *cache;
88
	uint64_t max_req_size;
69 89
	uint64_t object_size;
70
	uint64_t bucket_per_object;
90
	uint32_t bucket_size;
91
	uint32_t buckets_per_object;
71 92
	int write_policy;
72
	struct xq free_nodes;
73
	xhash_t *entries;
74
	struct cache_entry *nodes;
93
	//scheduler
75 94
};
76 95

  
77
struct cached {
78
	xport bp;
79
	uint64_t max_req_size;
80
	struct cache_io *ios;
81
	struct xq free_ios;
82
	//scheduler
96
struct ce {
97
	unsigned char *data;
98
	uint32_t *status;
99
	struct xwaitq *waitq;
100
	uint32_t flags;
101
	struct xlock lock;
102
	struct xworkq workq;
83 103
};
84 104

  
85 105

  
......
97 117
	return (struct cache_io *) pr->priv;
98 118
}
99 119

  
120
static uint32_t __get_bucket(struct cached *cache, uint64_t offset)
121
{
122
	return (offset / cache->bucket_size);
123
}
100 124

  
101
void cache_entry_get(struct cache *cache, xqindex idx)
125
static int is_loading(void *arg)
102 126
{
103
	struct cache_entry *ce = cache->entries[idx];
104
	ce->ref++;
127
	uint32_t *status = (uint32_t *)arg;
128
	return (*status == LOADING);
105 129
}
106 130

  
107
void cache_entry_put()
131
static int rw_range(struct peerd *peer, struct peer_req *pr, int action,
132
		uint32_t start, uint32_t end)
108 133
{
109
	struct cache_entry *ce = cache->entries[idx];
110
	ce->ref--;
111
	if (!ce->ref){
112
		//write dirty buckets
113
		//if no pending
114
		//  free cache entry
115
	}
134
	struct cached *cached = __get_cached(peer);
135
	struct cache_io *cio = __get_cache_io(pr);
136
	struct ce *ce = get_cache_entry(cached->cache, cio->h);
137

  
138
	return 0;
116 139
}
117 140

  
118
int cache_entry_init(struct cache *cache, xqindex idx, char *name)
141

  
142
int on_init(void *c, void *e)
119 143
{
120
	struct cache_entry *ce = cache->entries[idx];
121
	xlock_release(&ce->lock);
122
	ce->ref = 1;
123
	memset(ce->data, 0, cache->object_size);
124
	for (i = 0; i < cache->bucket_per_object; i++) {
144
	uint32_t i;
145
	struct cached *cached = (struct cached *)c;
146
	struct ce *ce = (struct ce *)e;
147
	ce->flags = 0;
148
	memset(ce->data, 0, cached->object_size);
149
	for (i = 0; i < cached->buckets_per_object; i++) {
125 150
		ce->status[i] = INVALID;
126 151
	}
127
	strncpy(ce->name, name, XSEG_MAX_TARGETLEN);
128
	ce->name[XSEG_MAX_TARGETLEN] = 0;
152
	xlock_release(&ce->lock);
129 153
	return 0;
130 154
}
131 155

  
132
int cache_init(struct cache *cache, uint32_t cache_size, uint64_t object_size,
133
		uint64_t bucket_size, int write_policy)
156
void on_put(void *c, void *e)
134 157
{
135
	unsigned long nr_nodes = cache_size * 2;
136
	xlock_release(&cache->lock);
137
	cache->object_size = object_size;
138
	cache->bucket_per_object = object_size/bucket_size;
139
	cache->write_policy = write_policy;
140
	cache->size = cache_size;
141
	if (!xq_alloc_seq(&cache->free_nodes, nr_nodes, nr_nodes)){
142
		return -1;
158
	struct cached *cached = (struct cached *)c;
159
	struct ce *ce = (struct ce *)e;
160
	//since we are the last referrer to the cache entry
161
	//no lock is needed.
162

  
163
	uint32_t start, end, i = 0;
164
	if (cached->write_policy == WRITETHROUGH || ce->flags & INVALIDATED)
165
		return;
166
	//write all dirty buckets.
167
	while(i < cached->buckets_per_object){
168
		if (ce->status[i] != DIRTY){
169
			i++;
170
			continue;
171
		}
172
		start = i;
173
		while (i < cached->buckets_per_object &&
174
			(i-start)*cached->bucket_size < cached->max_req_size &&
175
				ce->status[i] == DIRTY){
176
			i++;
177
		}
178
		end = i;
179
		//problem: no assocciated pr
180
		//maybe put one in cache entry
181
		rw_range(cached, ce, 1, start, end);
143 182
	}
144
	cache->entries = xhash_new(shift, cache_size, STRING);
145
	if (!cache->entries){
146
		return -1;
183
}
184

  
185
void * init_node(void *c)
186
{
187
	int i;
188
	//TODO err check
189
	struct cached *cached = (struct cached *)c;
190
	struct ce *ce = malloc(sizeof(struct ce));
191
	xlock_release(&ce->lock);
192
	ce->data = malloc(sizeof(unsigned char) * cached->object_size);
193
	ce->status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
194
	ce->waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
195
	for (i = 0; i < cached->buckets_per_object; i++) {
196
		xwaitq_init(&ce->waitq[i], is_loading, &ce->status[i],
197
				XWAIT_SIGNAL_ONE);
147 198
	}
148
	cache->nodes = malloc(sizeof(struct cache_entry) * nr_nodes);
149
	if (!cache->nodes){
150
		return -1;
199
	xworkq_init(&ce->workq, &ce->lock, 0);
200
	return ce;
201
}
202

  
203
struct xcache_ops c_ops = {
204
	.on_init = on_init,
205
	.on_put  = on_put,
206
	.on_node_init = init_node
207
};
208

  
209
static uint32_t __get_next_invalid(struct ce *ce, uint32_t start,
210
					uint32_t limit)
211
{
212
	uint32_t end = start+1;
213
	while (end <= limit && ce->status[end] == INVALID)
214
		end++;
215
	return end;
216
}
217

  
218
static void cached_fail(struct peerd *peer, struct peer_req *pr)
219
{
220
	struct cached *cached = __get_cached(peer);
221
	struct cache_io *cio = __get_cache_io(pr);
222
	if (cio->h != NoEntry){
223
		xcache_put(cached->cache, cio->h);
151 224
	}
225
	cio->h = NoEntry;
226
	fail(peer, pr);
227
}
152 228

  
153
	return 0;
229
static void cached_complete(struct peerd *peer, struct peer_req *pr)
230
{
231
	struct cached *cached = __get_cached(peer);
232
	struct cache_io *cio = __get_cache_io(pr);
233
	if (cio->h != NoEntry){
234
		xcache_put(cached->cache, cio->h);
235
	}
236
	cio->h = NoEntry;
237
	complete(peer, pr);
154 238
}
155 239

  
156
xqindex cache_lookup(struct cache *cache, char *name)
240
static void handle_read(void *arg);
241
//is this necessary?
242
static void status_changed(void *arg)
157 243
{
158
	return Noneidx;
244
	/*
245
	 * In this context we hold a reference to the cache entry.
246
	 *
247
	 * This function gets called only after the bucket at which the
248
	 * current peer_req is waiting, has finished loading of failed.
249
	 *
250
	 * Assumptions:
251
	 * 	Each pr waits only at one bucket at any time. That means that
252
	 * 	under no circumstances, this function get called simutaneously
253
	 * 	for the same pr.
254
	 */
255
	struct peer_req *pr = (struct peer_req *)arg;
256
	struct peerd *peer = pr->peer;
257
	struct cached *cached = __get_cached(peer);
258
	struct cache_io *cio = __get_cache_io(pr);
259
	struct ce *ce = get_cache_entry(cached->cache, cio->h);
260

  
261
	if (xworkq_enqueue(&ce->workq, handle_read, (void *)pr) < 0){
262
		//FAIL or mark as failed ? are we the last?
263
		if (cio->pending_reqs){
264
			// cannot put here, since there are outstanding reqs to
265
			// be received.
266
			// Simply mark pr as failed.
267
			cio->state = CIO_FAILED;
268
		} else {
269
			//safe to fail here, since there is no pending action on
270
			//this pr.
271
			cached_fail(peer, pr);
272
		}
273
	}
159 274
}
160 275

  
161
int __cache_insert(struct cache *cache, xqindex *idx)
276
static void handle_read(void *arg)
162 277
{
163
	return -1;
278
	/*
279
	 * In this context we hold a reference to the cache entry and
280
	 * the assocciated lock
281
	 */
282

  
283
	struct peer_req *pr = (struct peer_req *)arg;
284
	struct peerd *peer = pr->peer;
285
	struct cached *cached = __get_cached(peer);
286
	struct cache_io *cio = __get_cache_io(pr);
287
	struct xseg_request *req = pr->req;
288
	struct ce *ce = get_cache_entry(cached->cache, cio->h);
289

  
290
	uint32_t start_bucket, end_bucket, next;
291
	uint32_t i, b, limit;
292

  
293
	uint32_t pending_buckets = 0;
294

  
295
	if (cio->state == CIO_FAILED)
296
		goto out;
297

  
298
	b = __get_bucket(cached, req->offset);
299
	limit = __get_bucket(cached, req->offset + req->size);
300
	//assert limit < cached->object_size
301

  
302
	for (i = b; i < limit; i++) {
303
		if (bucket_readable(ce->status[i]))
304
			continue;
305
		if (ce->status[i] != LOADING){
306
			start_bucket = i;
307
			next = __get_next_invalid(ce, start_bucket, limit);
308
			end_bucket = next -1;
309
			i = next;
310
			if (rw_range(peer, pr, 0, start_bucket, end_bucket) < 0){
311
				cio->state = CIO_FAILED;
312
				break;
313
			}
314
			cio->pending_reqs++;
315
			cio->state =  CIO_READING;
316
		}
317
		pending_buckets++;;
318
	}
319

  
320
	if (pending_buckets) {
321
		/* Do not put cache entry yet */
322
		cio->work.job_fn = handle_read;
323
		cio->work.job = pr;
324
		/* wait on the last bucket */
325
		xwaitq_enqueue(&ce->waitq[end_bucket], &cio->work);
326
		return;
327
	}
328

  
329
out:
330
	if (cio->state == CIO_FAILED){
331
		if (!cio->pending_reqs)
332
			cached_fail(peer, pr);
333
	}
334
	else{
335
		//serve req;
336
		cached_complete(peer, pr);
337
	}
338
	return;
164 339
}
165 340

  
166
int cache_insert(struct cache *cache, xqindex *idx)
341
static void handle_write(void *arg)
167 342
{
343
	//if writeback
344
	//  	for each bucket
345
	//		write all buckets
346
	//		mark them as dirty
347
	//	cache_put(h)
348
	//	complete
349
	//else
350
	//	send write to blocker
351

  
352
	/*
353
	 * In this context we hold a reference to the cache entry and
354
	 * the assocciated lock
355
	 */
356

  
168 357
	int r;
169
	xlock_acquire(&cache->lock, 1);
170
	r = __cache_insert(cache, idx);
171
	xlock_release(&cache->lock);
358
	struct peer_req *pr = (struct peer_req *)arg;
359
	struct peerd *peer = pr->peer;
360
	struct cached *cached = __get_cached(peer);
361
	struct cache_io *cio = __get_cache_io(pr);
362
	struct ce *ce = get_cache_entry(cached->cache, cio->h);
363
	(void)ce;
364

  
365
	if (cached->write_policy == WRITETHROUGH){
366
		//send write to blocker
367
		//return
368
	} else if (cached->write_policy == WRITEBACK) {
369
		//for each bucket
370
		//	write all buckets
371
		//	mark them as dirty
372
		r = 0;
373
	} else {
374
		r = -1;
375
	}
376

  
377
out:
378
	if (r < 0)
379
		cached_fail(peer, pr);
380
	else
381
		cached_complete(peer, pr);
382
	return;
383
}
384

  
385
static int handle_readwrite(struct peerd *peer, struct peer_req *pr)
386
{
387
	int r = -1;
388
	struct ce *ce;
389
	struct cached *cached = __get_cached(peer);
390
	char name[XSEG_MAX_TARGETLEN + 1];
391
	struct xseg_request *req = pr->req;
392
	char *target = xseg_get_target(peer->xseg, req);
393

  
394
	xcache_handler h = NoEntry;
395

  
396
	strncpy(name, target, req->targetlen);
397
	name[XSEG_MAX_TARGETLEN] = 0;
398

  
399
	h = xcache_lookup(cached->cache, name);
400
	if (h == NoEntry){
401
		h = xcache_alloc_init(cached->cache, name);
402
		if (h == NoEntry){
403
			goto out;
404
		}
405
		r = xcache_insert(cached->cache, h);
406
		if (r < 0){
407
			goto out;
408
		}
409
	}
410

  
411
	ce = (struct ce *)get_cache_entry(cached->cache, h);
412
	if (!ce){
413
		r = -1;
414
		goto out;
415
	}
416

  
417
	if (req->op == X_WRITE)
418
		r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr);
419
	else if (req->op == X_READ)
420
		r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr);
421
	else {
422
		r = -1;
423
		goto out;
424
	}
425

  
426
out:
427
	if (r < 0){
428
		cached_fail(peer, pr);
429
	}
172 430
	return r;
431

  
173 432
}
174 433

  
175
int __cache_remove(struct cache *cache, xqindex *idx)
434
struct req_completion{
435
	struct peer_req *pr;
436
	struct xseg_request *req;
437
};
438

  
439
static void complete_read(void *arg)
176 440
{
177
	struct cache_entry *ce = cache->entries[idx];
178
	return xhash_delete(cache->entries, (xhashidx)ce->name);
441
	/*
442
	 * In this context we hold a reference to the cache entry and
443
	 * the assocciated lock
444
	 */
445

  
446
	struct req_completion *rc = (struct req_completion *)arg;
447
	struct peer_req *pr = rc->pr;
448
	struct xseg_request *req = rc->req;
449
	struct peerd *peer = pr->peer;
450
	struct cached *cached = __get_cached(peer);
451
	struct cache_io *cio = __get_cache_io(pr);
452
	struct ce *ce = get_cache_entry(cached->cache, cio->h);
453
	uint32_t start, end, i;
454
	int success;
455
	char *data = xseg_get_data(peer->xseg, req);
456

  
457
	/*
458
	 * Synchronize pending_reqs of the cache_io here, since each cache_io
459
	 * refers to only one object, and therefore we can use the object lock
460
	 * to synchronize between receive contextes.
461
	 */
462
	cio->pending_reqs--;
463
	success = (req->state == XS_SERVED && req->serviced == req->size);
464
	if (!success)
465
		cio->state = CIO_FAILED;
466
	//assert (req->offset % cached->bucket_size) == 0;
467
	//assert ((req->offset+req->serviced) % cached->bucket_size) == 0;
468
	start = __get_bucket(cached, req->offset);
469
	end = __get_bucket(cached, req->offset + req->serviced);
470
	for (i = start; i < end; i++) {
471
		if (ce->status[i] == LOADING){
472
			if (success){
473
				memcpy(ce->data+(i*cached->bucket_size), data,
474
						cached->bucket_size);
475
				ce->status[i] = VALID;
476
			}
477
			else {
478
				//reset status
479
				ce->status[i] = INVALID;
480
			}
481
			xwaitq_signal(&ce->waitq[i]);
482
		}
483
	}
484
	free(rc);
179 485
}
180 486

  
181
int cache_remove(struct cache *cache, xqindex *idx)
487
void complete_write(void *arg)
182 488
{
183
	int r;
184
	xlock_acquire(&cache->lock, 1);
185
	r = __cache_remove(cache, idx);
186
	xlock_release(&cache->lock);
187
	return r;
489
	//for each bucket
490
	//	if WRITETHROUGH
491
	//		copy data to bucket
492
	//		mark as valid
493
	//	else if WRITEBACK
494
	//		if status writing
495
	//			mark as valid
496
	//
497
	/*
498
	 * In this context we hold a reference to the cache entry and
499
	 * the assocciated lock
500
	 */
501
	return;
188 502
}
189 503

  
190
int cache_invalidate(struct cache *cache, char *name)
504
static int handle_receive_read(struct peerd *peer, struct peer_req *pr,
505
			struct xseg_request *req)
191 506
{
507
	/*
508
	 * Should be rentrant
509
	 */
510
	struct cached *cached = __get_cached(peer);
511
	struct cache_io *cio = __get_cache_io(pr);
512
	struct ce *ce = get_cache_entry(cached->cache, cio->h);
513

  
514
	struct req_completion *rc;
515

  
516
	rc = malloc(sizeof(struct req_completion));
517

  
518
	rc->pr = pr;
519
	rc->req = req;
520
	if (xworkq_enqueue(&ce->workq, complete_read, (void *)rc) < 0){
521
		free(rc);
522
		//TODO WHAT?
523
	}
524
	return 0;
525
}
526

  
527
static int handle_receive_write(struct peerd *peer, struct peer_req *pr)
528
{
529
	//enqueue_work
530
	return 0;
531
}
532

  
533
static int handle_delete(struct peerd *peer, struct peer_req *pr)
534
{
535
	//h = cache_lookup
536
	//if h
537
	//	cio->h = h
538
	//
539
	//send delete to blocker
540
	return 0;
541
}
542

  
543
static int handle_receive_delete(struct peerd *peer, struct peer_req *pr)
544
{
545
	//if success
546
	//	if cio->h
547
	//		//this should not write any dirty data
548
	//		xcache_remove(h)
549
	return 0;
550
}
551

  
552
static int forward_req(struct peerd *peer, struct peer_req *pr)
553
{
554
	//get request
555
	//hijack target and data 
556
	//submit
557
	return 0;
558
}
559

  
560
static int handle_receive(struct peerd *peer, struct peer_req *pr,
561
			struct xseg_request *req)
562
{
563
	//if not read/write/delete
564
	//	put req;
565
	//	complete or fail pr;
566
	return 0;
192 567
}
193 568

  
194 569
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
195 570
		enum dispatch_reason reason)
196 571
{
197
	struct cached *cacher = __get_cached(peer);
198
	(void) cacher;
572
	struct cached *cached = __get_cached(peer);
573
	(void) cached;
199 574
	struct cache_io *cio = __get_cache_io(pr);
200 575
	(void) cio;
201 576

  
202 577

  
203
	switch reason {
578
	switch (reason) {
204 579
		case dispatch_accept:
580
			cio->state = CIO_ACCEPTED;
205 581
			break;
206 582
		case dispatch_receive:
207 583
			break;
......
217 593
	int i;
218 594

  
219 595
	//FIXME error checks
220
	struct cacherd *cacherd = malloc(sizeof(struct cacherd));
221
	peer->priv = cacherd;
222
	cacher = cacherd;
223
	cacher->hashmaps = xhash_new(3, STRING);
596
	struct cached *cached = malloc(sizeof(struct cached));
597
	cached->cache = malloc(sizeof(struct xcache));
598
	xcache_init(cached->cache, cached->cache_size, &c_ops, peer);
599

  
600
	peer->priv = cached;
224 601

  
225 602
	for (i = 0; i < peer->nr_ops; i++) {
226
		struct mapper_io *mio = malloc(sizeof(struct mapper_io));
227
		mio->copyups_nodes = xhash_new(3, INTEGER);
228
		mio->copyups = 0;
229
		mio->err = 0;
230
		mio->active = 0;
603
		struct cache_io *mio = malloc(sizeof(struct cache_io));
231 604
		peer->peer_reqs[i].priv = mio;
232 605
	}
233 606

  
234
	mapper->bportno = -1;
235
	mapper->mbportno = -1;
607

  
608
	uint32_t cache_size;
609
	unsigned long bucket_size;
610
	unsigned long object_size;
611
	unsigned long max_req_size;
612

  
613
	cached->bportno = -1;
614
	cached->cache_size = -1;
615
	cached->max_req_size = -1;
616
	cached->object_size = -1;
617
	cached->bucket_size = -1;
618
	cached->write_policy = WRITETHROUGH;
619

  
236 620
	BEGIN_READ_ARGS(argc, argv);
237
	READ_ARG_ULONG("-bp", mapper->bportno);
238
	READ_ARG_ULONG("-mbp", mapper->mbportno);
621
	READ_ARG_ULONG("-bp", cached->bportno);
622
	READ_ARG_ULONG("-cs", cached->cache_size);
623
	READ_ARG_ULONG("-mrs", max_req_size);
624
	READ_ARG_ULONG("-os", object_size); // In buckets
625
	READ_ARG_ULONG("-bs", bucket_size); // In BUCKET_SIZE_QUANTUM
626
//	READ_ARG_ULONG("-wcp", cached->write_policy);
239 627
	END_READ_ARGS();
240
	if (mapper->bportno == -1){
628
	cached->bucket_size = BUCKET_SIZE_QUANTUM;
629
	cached->max_req_size = 512 * 1024;
630
	cached->object_size = 4 * 1024 * 1024;
631
	if (cached->bportno == -1){
241 632
		XSEGLOG2(&lc, E, "Portno for blocker must be provided");
242 633
		usage(argv[0]);
243 634
		return -1;
244 635
	}
245
	if (mapper->mbportno == -1){
246
		XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
247
		usage(argv[0]);
248
		return -1;
249
	}
250 636

  
637
	cached->buckets_per_object = cached->object_size / cached->bucket_size;
251 638
	return 0;
252 639
}
253 640

  
254 641
void custom_peer_finalize(struct peerd *peer)
255 642
{
256 643
	//write dirty objects
644
	//or cache_close(cached->cache);
257 645
	return;
258 646
}
259 647

  

Also available in: Unified diff