Revision d2d79e63 xseg/peers/user/cached.c
b/xseg/peers/user/cached.c | ||
---|---|---|
42 | 42 |
#include <xtypes/xlock.h> |
43 | 43 |
#include <xtypes/xq.h> |
44 | 44 |
#include <xtypes/xhash.h> |
45 |
#include <xtypes/xworkq.h> |
|
46 |
#include <xtypes/xwaitq.h> |
|
45 | 47 |
#include <xseg/protocol.h> |
48 |
#include <xtypes/xcache.h> |
|
46 | 49 |
|
50 |
|
|
51 |
/* bucket states */ |
|
47 | 52 |
#define INVALID 0 |
48 | 53 |
#define LOADING 1 |
49 | 54 |
#define VALID 2 |
50 | 55 |
#define DIRTY 3 |
56 |
#define WRITING 4 |
|
57 |
|
|
58 |
#define bucket_readable(__status) \ |
|
59 |
(__status == VALID || __status == DIRTY || __status == WRITING) |
|
51 | 60 |
|
61 |
/* write policies */ |
|
52 | 62 |
#define WRITETHROUGH 1 |
53 | 63 |
#define WRITEBACK 2 |
54 | 64 |
|
55 |
struct cache_entry { |
|
56 |
xlock lock; |
|
57 |
uint32_t ref; |
|
58 |
unsigned char *data; |
|
59 |
char name[XSEG_MAX_TARGETLEN + 1]; |
|
60 |
uint32_t *status //len: object_size/bucket_size |
|
61 |
}; |
|
65 |
|
|
66 |
/* object states */ |
|
67 |
#define INVALIDATED (1 << 0) |
|
68 |
|
|
69 |
|
|
70 |
/* cio states */ |
|
71 |
#define CIO_FAILED 1 |
|
72 |
#define CIO_ACCEPTED 2 |
|
73 |
#define CIO_READING 3 |
|
74 |
|
|
75 |
#define BUCKET_SIZE_QUANTUM 4096 |
|
62 | 76 |
|
63 | 77 |
struct cache_io { |
78 |
uint32_t state; |
|
79 |
xcache_handler h; |
|
80 |
uint32_t pending_reqs; |
|
81 |
struct work work; |
|
64 | 82 |
}; |
65 | 83 |
|
66 |
struct cache { |
|
67 |
xlock lock;
|
|
84 |
struct cached {
|
|
85 |
xport bportno;
|
|
68 | 86 |
uint32_t cache_size; |
87 |
struct xcache *cache; |
|
88 |
uint64_t max_req_size; |
|
69 | 89 |
uint64_t object_size; |
70 |
uint64_t bucket_per_object; |
|
90 |
uint32_t bucket_size; |
|
91 |
uint32_t buckets_per_object; |
|
71 | 92 |
int write_policy; |
72 |
struct xq free_nodes; |
|
73 |
xhash_t *entries; |
|
74 |
struct cache_entry *nodes; |
|
93 |
//scheduler |
|
75 | 94 |
}; |
76 | 95 |
|
77 |
struct cached { |
|
78 |
xport bp; |
|
79 |
uint64_t max_req_size; |
|
80 |
struct cache_io *ios; |
|
81 |
struct xq free_ios; |
|
82 |
//scheduler |
|
96 |
struct ce { |
|
97 |
unsigned char *data; |
|
98 |
uint32_t *status; |
|
99 |
struct xwaitq *waitq; |
|
100 |
uint32_t flags; |
|
101 |
struct xlock lock; |
|
102 |
struct xworkq workq; |
|
83 | 103 |
}; |
84 | 104 |
|
85 | 105 |
|
... | ... | |
97 | 117 |
return (struct cache_io *) pr->priv; |
98 | 118 |
} |
99 | 119 |
|
120 |
static uint32_t __get_bucket(struct cached *cache, uint64_t offset) |
|
121 |
{ |
|
122 |
return (offset / cache->bucket_size); |
|
123 |
} |
|
100 | 124 |
|
101 |
void cache_entry_get(struct cache *cache, xqindex idx)
|
|
125 |
static int is_loading(void *arg)
|
|
102 | 126 |
{ |
103 |
struct cache_entry *ce = cache->entries[idx];
|
|
104 |
ce->ref++;
|
|
127 |
uint32_t *status = (uint32_t *)arg;
|
|
128 |
return (*status == LOADING);
|
|
105 | 129 |
} |
106 | 130 |
|
107 |
void cache_entry_put() |
|
131 |
static int rw_range(struct peerd *peer, struct peer_req *pr, int action, |
|
132 |
uint32_t start, uint32_t end) |
|
108 | 133 |
{ |
109 |
struct cache_entry *ce = cache->entries[idx]; |
|
110 |
ce->ref--; |
|
111 |
if (!ce->ref){ |
|
112 |
//write dirty buckets |
|
113 |
//if no pending |
|
114 |
// free cache entry |
|
115 |
} |
|
134 |
struct cached *cached = __get_cached(peer); |
|
135 |
struct cache_io *cio = __get_cache_io(pr); |
|
136 |
struct ce *ce = get_cache_entry(cached->cache, cio->h); |
|
137 |
|
|
138 |
return 0; |
|
116 | 139 |
} |
117 | 140 |
|
118 |
int cache_entry_init(struct cache *cache, xqindex idx, char *name) |
|
141 |
|
|
142 |
int on_init(void *c, void *e) |
|
119 | 143 |
{ |
120 |
struct cache_entry *ce = cache->entries[idx]; |
|
121 |
xlock_release(&ce->lock); |
|
122 |
ce->ref = 1; |
|
123 |
memset(ce->data, 0, cache->object_size); |
|
124 |
for (i = 0; i < cache->bucket_per_object; i++) { |
|
144 |
uint32_t i; |
|
145 |
struct cached *cached = (struct cached *)c; |
|
146 |
struct ce *ce = (struct ce *)e; |
|
147 |
ce->flags = 0; |
|
148 |
memset(ce->data, 0, cached->object_size); |
|
149 |
for (i = 0; i < cached->buckets_per_object; i++) { |
|
125 | 150 |
ce->status[i] = INVALID; |
126 | 151 |
} |
127 |
strncpy(ce->name, name, XSEG_MAX_TARGETLEN); |
|
128 |
ce->name[XSEG_MAX_TARGETLEN] = 0; |
|
152 |
xlock_release(&ce->lock); |
|
129 | 153 |
return 0; |
130 | 154 |
} |
131 | 155 |
|
132 |
int cache_init(struct cache *cache, uint32_t cache_size, uint64_t object_size, |
|
133 |
uint64_t bucket_size, int write_policy) |
|
156 |
void on_put(void *c, void *e) |
|
134 | 157 |
{ |
135 |
unsigned long nr_nodes = cache_size * 2; |
|
136 |
xlock_release(&cache->lock); |
|
137 |
cache->object_size = object_size; |
|
138 |
cache->bucket_per_object = object_size/bucket_size; |
|
139 |
cache->write_policy = write_policy; |
|
140 |
cache->size = cache_size; |
|
141 |
if (!xq_alloc_seq(&cache->free_nodes, nr_nodes, nr_nodes)){ |
|
142 |
return -1; |
|
158 |
struct cached *cached = (struct cached *)c; |
|
159 |
struct ce *ce = (struct ce *)e; |
|
160 |
//since we are the last referrer to the cache entry |
|
161 |
//no lock is needed. |
|
162 |
|
|
163 |
uint32_t start, end, i = 0; |
|
164 |
if (cached->write_policy == WRITETHROUGH || ce->flags & INVALIDATED) |
|
165 |
return; |
|
166 |
//write all dirty buckets. |
|
167 |
while(i < cached->buckets_per_object){ |
|
168 |
if (ce->status[i] != DIRTY){ |
|
169 |
i++; |
|
170 |
continue; |
|
171 |
} |
|
172 |
start = i; |
|
173 |
while (i < cached->buckets_per_object && |
|
174 |
(i-start)*cached->bucket_size < cached->max_req_size && |
|
175 |
ce->status[i] == DIRTY){ |
|
176 |
i++; |
|
177 |
} |
|
178 |
end = i; |
|
179 |
//problem: no assocciated pr |
|
180 |
//maybe put one in cache entry |
|
181 |
rw_range(cached, ce, 1, start, end); |
|
143 | 182 |
} |
144 |
cache->entries = xhash_new(shift, cache_size, STRING); |
|
145 |
if (!cache->entries){ |
|
146 |
return -1; |
|
183 |
} |
|
184 |
|
|
185 |
void * init_node(void *c) |
|
186 |
{ |
|
187 |
int i; |
|
188 |
//TODO err check |
|
189 |
struct cached *cached = (struct cached *)c; |
|
190 |
struct ce *ce = malloc(sizeof(struct ce)); |
|
191 |
xlock_release(&ce->lock); |
|
192 |
ce->data = malloc(sizeof(unsigned char) * cached->object_size); |
|
193 |
ce->status = malloc(sizeof(uint32_t) * cached->buckets_per_object); |
|
194 |
ce->waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object); |
|
195 |
for (i = 0; i < cached->buckets_per_object; i++) { |
|
196 |
xwaitq_init(&ce->waitq[i], is_loading, &ce->status[i], |
|
197 |
XWAIT_SIGNAL_ONE); |
|
147 | 198 |
} |
148 |
cache->nodes = malloc(sizeof(struct cache_entry) * nr_nodes); |
|
149 |
if (!cache->nodes){ |
|
150 |
return -1; |
|
199 |
xworkq_init(&ce->workq, &ce->lock, 0); |
|
200 |
return ce; |
|
201 |
} |
|
202 |
|
|
203 |
struct xcache_ops c_ops = { |
|
204 |
.on_init = on_init, |
|
205 |
.on_put = on_put, |
|
206 |
.on_node_init = init_node |
|
207 |
}; |
|
208 |
|
|
209 |
static uint32_t __get_next_invalid(struct ce *ce, uint32_t start, |
|
210 |
uint32_t limit) |
|
211 |
{ |
|
212 |
uint32_t end = start+1; |
|
213 |
while (end <= limit && ce->status[end] == INVALID) |
|
214 |
end++; |
|
215 |
return end; |
|
216 |
} |
|
217 |
|
|
218 |
static void cached_fail(struct peerd *peer, struct peer_req *pr) |
|
219 |
{ |
|
220 |
struct cached *cached = __get_cached(peer); |
|
221 |
struct cache_io *cio = __get_cache_io(pr); |
|
222 |
if (cio->h != NoEntry){ |
|
223 |
xcache_put(cached->cache, cio->h); |
|
151 | 224 |
} |
225 |
cio->h = NoEntry; |
|
226 |
fail(peer, pr); |
|
227 |
} |
|
152 | 228 |
|
153 |
return 0; |
|
229 |
static void cached_complete(struct peerd *peer, struct peer_req *pr) |
|
230 |
{ |
|
231 |
struct cached *cached = __get_cached(peer); |
|
232 |
struct cache_io *cio = __get_cache_io(pr); |
|
233 |
if (cio->h != NoEntry){ |
|
234 |
xcache_put(cached->cache, cio->h); |
|
235 |
} |
|
236 |
cio->h = NoEntry; |
|
237 |
complete(peer, pr); |
|
154 | 238 |
} |
155 | 239 |
|
156 |
xqindex cache_lookup(struct cache *cache, char *name) |
|
240 |
static void handle_read(void *arg); |
|
241 |
//is this necessary? |
|
242 |
static void status_changed(void *arg) |
|
157 | 243 |
{ |
158 |
return Noneidx; |
|
244 |
/* |
|
245 |
* In this context we hold a reference to the cache entry. |
|
246 |
* |
|
247 |
* This function gets called only after the bucket at which the |
|
248 |
* current peer_req is waiting, has finished loading of failed. |
|
249 |
* |
|
250 |
* Assumptions: |
|
251 |
* Each pr waits only at one bucket at any time. That means that |
|
252 |
* under no circumstances, this function get called simutaneously |
|
253 |
* for the same pr. |
|
254 |
*/ |
|
255 |
struct peer_req *pr = (struct peer_req *)arg; |
|
256 |
struct peerd *peer = pr->peer; |
|
257 |
struct cached *cached = __get_cached(peer); |
|
258 |
struct cache_io *cio = __get_cache_io(pr); |
|
259 |
struct ce *ce = get_cache_entry(cached->cache, cio->h); |
|
260 |
|
|
261 |
if (xworkq_enqueue(&ce->workq, handle_read, (void *)pr) < 0){ |
|
262 |
//FAIL or mark as failed ? are we the last? |
|
263 |
if (cio->pending_reqs){ |
|
264 |
// cannot put here, since there are outstanding reqs to |
|
265 |
// be received. |
|
266 |
// Simply mark pr as failed. |
|
267 |
cio->state = CIO_FAILED; |
|
268 |
} else { |
|
269 |
//safe to fail here, since there is no pending action on |
|
270 |
//this pr. |
|
271 |
cached_fail(peer, pr); |
|
272 |
} |
|
273 |
} |
|
159 | 274 |
} |
160 | 275 |
|
161 |
int __cache_insert(struct cache *cache, xqindex *idx)
|
|
276 |
static void handle_read(void *arg)
|
|
162 | 277 |
{ |
163 |
return -1; |
|
278 |
/* |
|
279 |
* In this context we hold a reference to the cache entry and |
|
280 |
* the assocciated lock |
|
281 |
*/ |
|
282 |
|
|
283 |
struct peer_req *pr = (struct peer_req *)arg; |
|
284 |
struct peerd *peer = pr->peer; |
|
285 |
struct cached *cached = __get_cached(peer); |
|
286 |
struct cache_io *cio = __get_cache_io(pr); |
|
287 |
struct xseg_request *req = pr->req; |
|
288 |
struct ce *ce = get_cache_entry(cached->cache, cio->h); |
|
289 |
|
|
290 |
uint32_t start_bucket, end_bucket, next; |
|
291 |
uint32_t i, b, limit; |
|
292 |
|
|
293 |
uint32_t pending_buckets = 0; |
|
294 |
|
|
295 |
if (cio->state == CIO_FAILED) |
|
296 |
goto out; |
|
297 |
|
|
298 |
b = __get_bucket(cached, req->offset); |
|
299 |
limit = __get_bucket(cached, req->offset + req->size); |
|
300 |
//assert limit < cached->object_size |
|
301 |
|
|
302 |
for (i = b; i < limit; i++) { |
|
303 |
if (bucket_readable(ce->status[i])) |
|
304 |
continue; |
|
305 |
if (ce->status[i] != LOADING){ |
|
306 |
start_bucket = i; |
|
307 |
next = __get_next_invalid(ce, start_bucket, limit); |
|
308 |
end_bucket = next -1; |
|
309 |
i = next; |
|
310 |
if (rw_range(peer, pr, 0, start_bucket, end_bucket) < 0){ |
|
311 |
cio->state = CIO_FAILED; |
|
312 |
break; |
|
313 |
} |
|
314 |
cio->pending_reqs++; |
|
315 |
cio->state = CIO_READING; |
|
316 |
} |
|
317 |
pending_buckets++;; |
|
318 |
} |
|
319 |
|
|
320 |
if (pending_buckets) { |
|
321 |
/* Do not put cache entry yet */ |
|
322 |
cio->work.job_fn = handle_read; |
|
323 |
cio->work.job = pr; |
|
324 |
/* wait on the last bucket */ |
|
325 |
xwaitq_enqueue(&ce->waitq[end_bucket], &cio->work); |
|
326 |
return; |
|
327 |
} |
|
328 |
|
|
329 |
out: |
|
330 |
if (cio->state == CIO_FAILED){ |
|
331 |
if (!cio->pending_reqs) |
|
332 |
cached_fail(peer, pr); |
|
333 |
} |
|
334 |
else{ |
|
335 |
//serve req; |
|
336 |
cached_complete(peer, pr); |
|
337 |
} |
|
338 |
return; |
|
164 | 339 |
} |
165 | 340 |
|
166 |
int cache_insert(struct cache *cache, xqindex *idx)
|
|
341 |
static void handle_write(void *arg)
|
|
167 | 342 |
{ |
343 |
//if writeback |
|
344 |
// for each bucket |
|
345 |
// write all buckets |
|
346 |
// mark them as dirty |
|
347 |
// cache_put(h) |
|
348 |
// complete |
|
349 |
//else |
|
350 |
// send write to blocker |
|
351 |
|
|
352 |
/* |
|
353 |
* In this context we hold a reference to the cache entry and |
|
354 |
* the assocciated lock |
|
355 |
*/ |
|
356 |
|
|
168 | 357 |
int r; |
169 |
xlock_acquire(&cache->lock, 1); |
|
170 |
r = __cache_insert(cache, idx); |
|
171 |
xlock_release(&cache->lock); |
|
358 |
struct peer_req *pr = (struct peer_req *)arg; |
|
359 |
struct peerd *peer = pr->peer; |
|
360 |
struct cached *cached = __get_cached(peer); |
|
361 |
struct cache_io *cio = __get_cache_io(pr); |
|
362 |
struct ce *ce = get_cache_entry(cached->cache, cio->h); |
|
363 |
(void)ce; |
|
364 |
|
|
365 |
if (cached->write_policy == WRITETHROUGH){ |
|
366 |
//send write to blocker |
|
367 |
//return |
|
368 |
} else if (cached->write_policy == WRITEBACK) { |
|
369 |
//for each bucket |
|
370 |
// write all buckets |
|
371 |
// mark them as dirty |
|
372 |
r = 0; |
|
373 |
} else { |
|
374 |
r = -1; |
|
375 |
} |
|
376 |
|
|
377 |
out: |
|
378 |
if (r < 0) |
|
379 |
cached_fail(peer, pr); |
|
380 |
else |
|
381 |
cached_complete(peer, pr); |
|
382 |
return; |
|
383 |
} |
|
384 |
|
|
385 |
static int handle_readwrite(struct peerd *peer, struct peer_req *pr) |
|
386 |
{ |
|
387 |
int r = -1; |
|
388 |
struct ce *ce; |
|
389 |
struct cached *cached = __get_cached(peer); |
|
390 |
char name[XSEG_MAX_TARGETLEN + 1]; |
|
391 |
struct xseg_request *req = pr->req; |
|
392 |
char *target = xseg_get_target(peer->xseg, req); |
|
393 |
|
|
394 |
xcache_handler h = NoEntry; |
|
395 |
|
|
396 |
strncpy(name, target, req->targetlen); |
|
397 |
name[XSEG_MAX_TARGETLEN] = 0; |
|
398 |
|
|
399 |
h = xcache_lookup(cached->cache, name); |
|
400 |
if (h == NoEntry){ |
|
401 |
h = xcache_alloc_init(cached->cache, name); |
|
402 |
if (h == NoEntry){ |
|
403 |
goto out; |
|
404 |
} |
|
405 |
r = xcache_insert(cached->cache, h); |
|
406 |
if (r < 0){ |
|
407 |
goto out; |
|
408 |
} |
|
409 |
} |
|
410 |
|
|
411 |
ce = (struct ce *)get_cache_entry(cached->cache, h); |
|
412 |
if (!ce){ |
|
413 |
r = -1; |
|
414 |
goto out; |
|
415 |
} |
|
416 |
|
|
417 |
if (req->op == X_WRITE) |
|
418 |
r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr); |
|
419 |
else if (req->op == X_READ) |
|
420 |
r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr); |
|
421 |
else { |
|
422 |
r = -1; |
|
423 |
goto out; |
|
424 |
} |
|
425 |
|
|
426 |
out: |
|
427 |
if (r < 0){ |
|
428 |
cached_fail(peer, pr); |
|
429 |
} |
|
172 | 430 |
return r; |
431 |
|
|
173 | 432 |
} |
174 | 433 |
|
175 |
int __cache_remove(struct cache *cache, xqindex *idx) |
|
434 |
struct req_completion{ |
|
435 |
struct peer_req *pr; |
|
436 |
struct xseg_request *req; |
|
437 |
}; |
|
438 |
|
|
439 |
static void complete_read(void *arg) |
|
176 | 440 |
{ |
177 |
struct cache_entry *ce = cache->entries[idx]; |
|
178 |
return xhash_delete(cache->entries, (xhashidx)ce->name); |
|
441 |
/* |
|
442 |
* In this context we hold a reference to the cache entry and |
|
443 |
* the assocciated lock |
|
444 |
*/ |
|
445 |
|
|
446 |
struct req_completion *rc = (struct req_completion *)arg; |
|
447 |
struct peer_req *pr = rc->pr; |
|
448 |
struct xseg_request *req = rc->req; |
|
449 |
struct peerd *peer = pr->peer; |
|
450 |
struct cached *cached = __get_cached(peer); |
|
451 |
struct cache_io *cio = __get_cache_io(pr); |
|
452 |
struct ce *ce = get_cache_entry(cached->cache, cio->h); |
|
453 |
uint32_t start, end, i; |
|
454 |
int success; |
|
455 |
char *data = xseg_get_data(peer->xseg, req); |
|
456 |
|
|
457 |
/* |
|
458 |
* Synchronize pending_reqs of the cache_io here, since each cache_io |
|
459 |
* refers to only one object, and therefore we can use the object lock |
|
460 |
* to synchronize between receive contextes. |
|
461 |
*/ |
|
462 |
cio->pending_reqs--; |
|
463 |
success = (req->state == XS_SERVED && req->serviced == req->size); |
|
464 |
if (!success) |
|
465 |
cio->state = CIO_FAILED; |
|
466 |
//assert (req->offset % cached->bucket_size) == 0; |
|
467 |
//assert ((req->offset+req->serviced) % cached->bucket_size) == 0; |
|
468 |
start = __get_bucket(cached, req->offset); |
|
469 |
end = __get_bucket(cached, req->offset + req->serviced); |
|
470 |
for (i = start; i < end; i++) { |
|
471 |
if (ce->status[i] == LOADING){ |
|
472 |
if (success){ |
|
473 |
memcpy(ce->data+(i*cached->bucket_size), data, |
|
474 |
cached->bucket_size); |
|
475 |
ce->status[i] = VALID; |
|
476 |
} |
|
477 |
else { |
|
478 |
//reset status |
|
479 |
ce->status[i] = INVALID; |
|
480 |
} |
|
481 |
xwaitq_signal(&ce->waitq[i]); |
|
482 |
} |
|
483 |
} |
|
484 |
free(rc); |
|
179 | 485 |
} |
180 | 486 |
|
181 |
int cache_remove(struct cache *cache, xqindex *idx)
|
|
487 |
void complete_write(void *arg)
|
|
182 | 488 |
{ |
183 |
int r; |
|
184 |
xlock_acquire(&cache->lock, 1); |
|
185 |
r = __cache_remove(cache, idx); |
|
186 |
xlock_release(&cache->lock); |
|
187 |
return r; |
|
489 |
//for each bucket |
|
490 |
// if WRITETHROUGH |
|
491 |
// copy data to bucket |
|
492 |
// mark as valid |
|
493 |
// else if WRITEBACK |
|
494 |
// if status writing |
|
495 |
// mark as valid |
|
496 |
// |
|
497 |
/* |
|
498 |
* In this context we hold a reference to the cache entry and |
|
499 |
* the assocciated lock |
|
500 |
*/ |
|
501 |
return; |
|
188 | 502 |
} |
189 | 503 |
|
190 |
int cache_invalidate(struct cache *cache, char *name) |
|
504 |
static int handle_receive_read(struct peerd *peer, struct peer_req *pr, |
|
505 |
struct xseg_request *req) |
|
191 | 506 |
{ |
507 |
/* |
|
508 |
* Should be rentrant |
|
509 |
*/ |
|
510 |
struct cached *cached = __get_cached(peer); |
|
511 |
struct cache_io *cio = __get_cache_io(pr); |
|
512 |
struct ce *ce = get_cache_entry(cached->cache, cio->h); |
|
513 |
|
|
514 |
struct req_completion *rc; |
|
515 |
|
|
516 |
rc = malloc(sizeof(struct req_completion)); |
|
517 |
|
|
518 |
rc->pr = pr; |
|
519 |
rc->req = req; |
|
520 |
if (xworkq_enqueue(&ce->workq, complete_read, (void *)rc) < 0){ |
|
521 |
free(rc); |
|
522 |
//TODO WHAT? |
|
523 |
} |
|
524 |
return 0; |
|
525 |
} |
|
526 |
|
|
527 |
static int handle_receive_write(struct peerd *peer, struct peer_req *pr) |
|
528 |
{ |
|
529 |
//enqueue_work |
|
530 |
return 0; |
|
531 |
} |
|
532 |
|
|
533 |
static int handle_delete(struct peerd *peer, struct peer_req *pr) |
|
534 |
{ |
|
535 |
//h = cache_lookup |
|
536 |
//if h |
|
537 |
// cio->h = h |
|
538 |
// |
|
539 |
//send delete to blocker |
|
540 |
return 0; |
|
541 |
} |
|
542 |
|
|
543 |
static int handle_receive_delete(struct peerd *peer, struct peer_req *pr) |
|
544 |
{ |
|
545 |
//if success |
|
546 |
// if cio->h |
|
547 |
// //this should not write any dirty data |
|
548 |
// xcache_remove(h) |
|
549 |
return 0; |
|
550 |
} |
|
551 |
|
|
552 |
static int forward_req(struct peerd *peer, struct peer_req *pr) |
|
553 |
{ |
|
554 |
//get request |
|
555 |
//hijack target and data |
|
556 |
//submit |
|
557 |
return 0; |
|
558 |
} |
|
559 |
|
|
560 |
static int handle_receive(struct peerd *peer, struct peer_req *pr, |
|
561 |
struct xseg_request *req) |
|
562 |
{ |
|
563 |
//if not read/write/delete |
|
564 |
// put req; |
|
565 |
// complete or fail pr; |
|
566 |
return 0; |
|
192 | 567 |
} |
193 | 568 |
|
194 | 569 |
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req, |
195 | 570 |
enum dispatch_reason reason) |
196 | 571 |
{ |
197 |
struct cached *cacher = __get_cached(peer);
|
|
198 |
(void) cacher;
|
|
572 |
struct cached *cached = __get_cached(peer);
|
|
573 |
(void) cached;
|
|
199 | 574 |
struct cache_io *cio = __get_cache_io(pr); |
200 | 575 |
(void) cio; |
201 | 576 |
|
202 | 577 |
|
203 |
switch reason {
|
|
578 |
switch (reason) {
|
|
204 | 579 |
case dispatch_accept: |
580 |
cio->state = CIO_ACCEPTED; |
|
205 | 581 |
break; |
206 | 582 |
case dispatch_receive: |
207 | 583 |
break; |
... | ... | |
217 | 593 |
int i; |
218 | 594 |
|
219 | 595 |
//FIXME error checks |
220 |
struct cacherd *cacherd = malloc(sizeof(struct cacherd)); |
|
221 |
peer->priv = cacherd; |
|
222 |
cacher = cacherd; |
|
223 |
cacher->hashmaps = xhash_new(3, STRING); |
|
596 |
struct cached *cached = malloc(sizeof(struct cached)); |
|
597 |
cached->cache = malloc(sizeof(struct xcache)); |
|
598 |
xcache_init(cached->cache, cached->cache_size, &c_ops, peer); |
|
599 |
|
|
600 |
peer->priv = cached; |
|
224 | 601 |
|
225 | 602 |
for (i = 0; i < peer->nr_ops; i++) { |
226 |
struct mapper_io *mio = malloc(sizeof(struct mapper_io)); |
|
227 |
mio->copyups_nodes = xhash_new(3, INTEGER); |
|
228 |
mio->copyups = 0; |
|
229 |
mio->err = 0; |
|
230 |
mio->active = 0; |
|
603 |
struct cache_io *mio = malloc(sizeof(struct cache_io)); |
|
231 | 604 |
peer->peer_reqs[i].priv = mio; |
232 | 605 |
} |
233 | 606 |
|
234 |
mapper->bportno = -1; |
|
235 |
mapper->mbportno = -1; |
|
607 |
|
|
608 |
uint32_t cache_size; |
|
609 |
unsigned long bucket_size; |
|
610 |
unsigned long object_size; |
|
611 |
unsigned long max_req_size; |
|
612 |
|
|
613 |
cached->bportno = -1; |
|
614 |
cached->cache_size = -1; |
|
615 |
cached->max_req_size = -1; |
|
616 |
cached->object_size = -1; |
|
617 |
cached->bucket_size = -1; |
|
618 |
cached->write_policy = WRITETHROUGH; |
|
619 |
|
|
236 | 620 |
BEGIN_READ_ARGS(argc, argv); |
237 |
READ_ARG_ULONG("-bp", mapper->bportno); |
|
238 |
READ_ARG_ULONG("-mbp", mapper->mbportno); |
|
621 |
READ_ARG_ULONG("-bp", cached->bportno); |
|
622 |
READ_ARG_ULONG("-cs", cached->cache_size); |
|
623 |
READ_ARG_ULONG("-mrs", max_req_size); |
|
624 |
READ_ARG_ULONG("-os", object_size); // In buckets |
|
625 |
READ_ARG_ULONG("-bs", bucket_size); // In BUCKET_SIZE_QUANTUM |
|
626 |
// READ_ARG_ULONG("-wcp", cached->write_policy); |
|
239 | 627 |
END_READ_ARGS(); |
240 |
if (mapper->bportno == -1){ |
|
628 |
cached->bucket_size = BUCKET_SIZE_QUANTUM; |
|
629 |
cached->max_req_size = 512 * 1024; |
|
630 |
cached->object_size = 4 * 1024 * 1024; |
|
631 |
if (cached->bportno == -1){ |
|
241 | 632 |
XSEGLOG2(&lc, E, "Portno for blocker must be provided"); |
242 | 633 |
usage(argv[0]); |
243 | 634 |
return -1; |
244 | 635 |
} |
245 |
if (mapper->mbportno == -1){ |
|
246 |
XSEGLOG2(&lc, E, "Portno for mblocker must be provided"); |
|
247 |
usage(argv[0]); |
|
248 |
return -1; |
|
249 |
} |
|
250 | 636 |
|
637 |
cached->buckets_per_object = cached->object_size / cached->bucket_size; |
|
251 | 638 |
return 0; |
252 | 639 |
} |
253 | 640 |
|
254 | 641 |
void custom_peer_finalize(struct peerd *peer) |
255 | 642 |
{ |
256 | 643 |
//write dirty objects |
644 |
//or cache_close(cached->cache); |
|
257 | 645 |
return; |
258 | 646 |
} |
259 | 647 |
|
Also available in: Unified diff