Add xcache xtype implementation
[archipelago] / xseg / xtypes / xcache.c
1 /*
2  * Copyright 2013 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <xtypes/xcache.h>
36 //TODO container aware and xptrs
37
38 #if 0
39 static struct xcache_entry * get_cache_entry(struct xcache *cache, xqindex idx)
40 {
41    return &cache->entries[idx];
42 }
43
44 static xqindex __get_cache_idx(struct xcache *cache, struct xcache_entry *ce)
45 {
46         return (xqindex)(ce - cache->nodes);
47 }
48
49 static xqindex __alloc_cache_entry(struct xcache *cache)
50 {
51         return __xq_pop_head(&cache->free_nodes);
52 }
53
54 static void __free_cache_entry(struct xcache *cache, xqindex idx)
55 {
56         if (__xq_append_head(&cache->free_nodes, idx) == Noneidx)
57                 XSEGLOG("BUG: Could not free cache entry node. Queue is full");
58 }
59 #endif
60
61 /* table helper functions */
62 static xcache_handler __table_lookup(xhash_t *table, char *name)
63 {
64         xqindex xqi = Noneidx;
65         if (xhash_lookup(table, (xhashidx)name, &xqi) < 0)
66                 return NoEntry;
67         return (xcache_handler)xqi;
68 }
69
70 static int __table_insert(xhash_t **table, struct xcache * cache, xcache_handler h)
71 {
72         xhash_t *new;
73         xqindex idx = (xqindex)h;
74         struct xcache_entry *ce = &cache->nodes[idx];
75         int r = 0;
76
77         do {
78                 r = xhash_insert(*table, (xhashidx)ce->name, idx);
79                 if (r == -XHASH_ERESIZE){
80                         XSEGLOG("Rebuilding internal hash table");
81                         new = xhash_resize(*table,
82                                         cache->rm_entries->size_shift,
83                                         cache->rm_entries->limit, NULL);
84                         if (!new) {
85                                 XSEGLOG("Error resizing hash table");
86                                 return -1;
87                         }
88                         *table = new;
89                 }
90         } while (r == -XHASH_ERESIZE);
91
92         return r;
93 }
94
95 static int __table_remove(xhash_t *table, char *name)
96 {
97         int r;
98
99         r = xhash_delete(table, (xhashidx)name);
100         if (UNLIKELY(r<0)){
101                 if (r == -XHASH_ERESIZE)
102                         XSEGLOG("BUG: hash table must be resized");
103                 else if (r == -XHASH_EEXIST)
104                         XSEGLOG("BUG: Entry %s not found in hash table", name);
105         }
106         return r;
107 }
108
109
110
111
112 static xqindex alloc_cache_entry(struct xcache *cache)
113 {
114         return xq_pop_head(&cache->free_nodes, 1);
115 }
116
117 static void __free_cache_entry(struct xcache *cache, xqindex idx)
118 {
119         if (UNLIKELY(xq_append_head(&cache->free_nodes, idx, 1) == Noneidx))
120                 XSEGLOG("BUG: Could not free cache entry node. Queue is full");
121 }
122
123 static void free_cache_entry(struct xcache *cache, xqindex idx)
124 {
125         struct xcache_entry *ce = &cache->nodes[idx];
126         __free_cache_entry(cache, idx);
127         if (cache->ops.on_free)
128                 cache->ops.on_free(cache->priv, ce->priv);
129 }
130
131 static xqindex __count_free_nodes(struct xcache *cache)
132 {
133         return xq_count(&cache->free_nodes);
134 }
135
136 static void __reset_times(struct xcache *cache)
137 {
138         uint32_t i;
139         struct xcache_entry *ce;
140         xbinheapidx time;
141
142         /* assert thatn cache->time does not get MAX value. If this happens, add
143          * one more, to overflow time and return to zero.
144          */
145         if (cache->flags & XCACHE_LRU_ARRAY){
146                 for (i = 0; i < cache->size; i++) {
147                         if (cache->times[i] != XCACHE_LRU_MAX)
148                                 cache->times[i] = cache->time++;
149                 }
150         } else if (cache->flags & XCACHE_LRU_HEAP) {
151                 for (i = 0; i < cache->size; i++) {
152                         ce = &cache->nodes[i];
153                         if (ce->h == NoNode)
154                                 continue;
155                         time = xbinheap_getkey(&cache->binheap, ce->h);
156                         if (time < cache->time)
157                                 xbinheap_increasekey(&cache->binheap, ce->h, cache->time);
158                         else
159                                 xbinheap_decreasekey(&cache->binheap, ce->h, cache->time);
160                 }
161         }
162 }
163
164 /*
165  * xbinheap should be protected by cache lock.
166  */
167 static void __update_access_time(struct xcache *cache, xqindex idx)
168 {
169         struct xcache_entry *ce = &cache->nodes[idx];
170
171         /* assert thatn cache->time does not get MAX value. If this happen,
172          * reset it to zero, and also reset all access times.
173          */
174         cache->time++;
175         if (cache->time == XCACHE_LRU_MAX) {
176                 cache->time = 0;
177                 __reset_times(cache);
178                 return;
179         }
180
181         if (cache->flags & XCACHE_LRU_ARRAY){
182                 cache->times[idx] = cache->time;
183         } else if (cache->flags & XCACHE_LRU_HEAP) {
184                 if (ce->h != NoNode){
185                         xbinheap_increasekey(&cache->binheap, ce->h, cache->time);
186                 } else {
187                         ce->h = xbinheap_insert(&cache->binheap, cache->time, idx);
188                         if (ce->h == NoNode){
189                                 XSEGLOG("BUG: Cannot insert to lru binary heap");
190                         }
191                 }
192         }
193 }
194
195 /* __xcache_entry_get needs no lock. */
196 static void __xcache_entry_get(struct xcache *cache, xqindex idx)
197 {
198         struct xcache_entry *ce = &cache->nodes[idx];
199         __sync_add_and_fetch(&ce->ref, 1);
200 }
201
202 /*
203  * __xcache_entry_get_and_update must be called with cache->lock held, due to the race for
204  * __update_access_time.
205  */
206 static void __xcache_entry_get_and_update(struct xcache *cache, xqindex idx)
207 {
208         __xcache_entry_get(cache, idx);
209         __update_access_time(cache, idx);
210 }
211
212 /* after a succesfull call, the handler must be put */
213 static int __xcache_remove_entries(struct xcache *cache, xcache_handler h)
214 {
215         int r;
216         xqindex idx = (xqindex)h;
217         struct xcache_entry *ce = &cache->nodes[idx];
218
219         r = __table_remove(cache->entries, ce->name);
220         if (UNLIKELY(r < 0)){
221                 XSEGLOG("Couldn't delete cache entry from hash table:\n"
222                                 "h: %llu, name: %s, cache->nodes[h].priv: %p, ref: %llu",
223                                 h, ce->name, cache->nodes[idx].priv, cache->nodes[idx].ref);
224                 return r;
225         }
226
227         if (cache->flags & XCACHE_LRU_ARRAY)
228                 cache->times[idx] = XCACHE_LRU_MAX;
229         else if (cache->flags & XCACHE_LRU_HEAP) {
230                 if (ce->h != NoNode) {
231                         if (xbinheap_increasekey(&cache->binheap, ce->h, XCACHE_LRU_MAX) < 0){
232                                 XSEGLOG("BUG: cannot increase key to XCACHE_LRU_MAX");
233                         }
234                         if (xbinheap_extract(&cache->binheap) == NoNode){
235                                 XSEGLOG("BUG: cannot remove cache entry from lru");
236                         }
237                         ce->h = NoNode;
238                 }
239         }
240         //XSEGLOG("cache->times[%llu] = %llu", idx, cache->times[idx]);
241         return 0;
242 }
243
244 /*
245  * __xcache_remove_rm must always be called with cache->rm_lock held.
246  * It finalizes the removal of an entry from the cache.
247  */
248 static int __xcache_remove_rm(struct xcache *cache, xcache_handler h)
249 {
250         int r;
251         xqindex idx = (xqindex)h;
252         struct xcache_entry *ce = &cache->nodes[idx];
253
254         r = __table_remove(cache->rm_entries, ce->name);
255         if (UNLIKELY(r < 0)) {
256                 XSEGLOG("Couldn't delete cache entry from hash table:\n"
257                                 "h: %llu, name: %s, cache->nodes[h].priv: %p, ref: %llu",
258                                 h, ce->name, cache->nodes[idx].priv, cache->nodes[idx].ref);
259                 return r;
260         }
261
262         return r;
263 }
264
265 /*
266  * __xcache_lookup_rm must always be called with cache->rm_lock held.
267  * It checks if name exists in "rm_entries"
268  */
269 static xcache_handler __xcache_lookup_rm(struct xcache *cache, char *name)
270 {
271         return __table_lookup(cache->rm_entries, name);
272 }
273
274 static xcache_handler __xcache_lookup_and_get_rm(struct xcache *cache, char *name)
275 {
276         xcache_handler h;
277         h = __xcache_lookup_rm(cache, name);
278         if (h != NoEntry){
279                 __xcache_entry_get_and_update(cache, h);
280         }
281         return h;
282 }
283
284 static xcache_handler __xcache_lookup_entries(struct xcache *cache, char *name)
285 {
286         return __table_lookup(cache->entries, name);
287 }
288
289 static xcache_handler __xcache_lookup_and_get_entries(struct xcache *cache, char *name)
290 {
291         xcache_handler h;
292         h = __xcache_lookup_entries(cache, name);
293         if (h != NoEntry){
294                 __xcache_entry_get_and_update(cache, h);
295         }
296         return h;
297 }
298
299 static xcache_handler __xcache_insert_rm(struct xcache *cache, xcache_handler h)
300 {
301         return __table_insert(&cache->rm_entries, cache, h);
302 }
303
304 static xcache_handler __xcache_insert_entries(struct xcache *cache, xcache_handler h)
305 {
306         return __table_insert(&cache->entries, cache, h);
307 }
308 /*
309  * xcache_entry_put is thread-safe even without cache->lock (cache->rm_lock is
310  * crucial though). This is why:
311  *
312  * a. We put the entry's refcount. If it doesn't drop to zero, we can move on.
313  * b. If it drops to zero, then we inform the peer that the cache entry is about
314  *    to leave with the "on_evict" hook.
315  * c. If peer returns a negative value, we do not free the entry yet and leave.
316  * d. If everything is ok, we get the rm_lock.
317  * e. Since rm_lock is needed for a re-insertion or a simple 'get' for this
318  *    entry, we can safely check again the entry's ref here. If it's > 0, then
319  *    someone beat us to it and has a job to do. If it's 0 though, then by
320  *    removing the entry from "rm_entries" we are safe to free that entry
321  *    without rm_lock.
322  *
323  * NOTE: the peer must guarantee that the refcount does not drop to zero twice
324  * while xcache_entry_put runs.
325  */
326 static void xcache_entry_put(struct xcache *cache, xqindex idx)
327 {
328         struct xcache_entry *ce = &cache->nodes[idx];
329         unsigned long ref = __sync_sub_and_fetch(&ce->ref, 1);
330         int r = 0;
331
332         if (ref > 0)
333                 return;
334
335         if (cache->ops.on_finalize &&
336                         cache->ops.on_finalize(cache->priv, ce->priv))
337                 return;
338
339         if (ce->state == NODE_EVICTED) {
340                 xlock_acquire(&cache->rm_lock, 1);
341                 if (ce->ref == 0)
342                         r = __xcache_remove_rm(cache, idx);
343                 xlock_release(&cache->rm_lock);
344         }
345
346         if (cache->ops.on_put)
347                 cache->ops.on_put(cache->priv, ce->priv);
348
349         if (r >= 0)
350                 free_cache_entry(cache, idx);
351 }
352
353 static int xcache_entry_init(struct xcache *cache, xqindex idx, char *name)
354 {
355         int r = 0;
356         struct xcache_entry *ce = &cache->nodes[idx];
357
358         xlock_release(&ce->lock);
359         if (UNLIKELY(ce->ref != 0))
360                 XSEGLOG("BUG: New entry has ref != 0 (h: %lu, ref: %lu)", idx, ce->ref);
361         ce->ref = 1;
362         strncpy(ce->name, name, XSEG_MAX_TARGETLEN);
363         ce->name[XSEG_MAX_TARGETLEN] = 0;
364         ce->h = NoNode;
365         ce->state = NODE_ACTIVE;
366         if (cache->ops.on_init)
367                 r = cache->ops.on_init(cache->priv, ce->priv);
368         return r;
369 }
370
371
372 static xqindex __xcache_lru(struct xcache *cache)
373 {
374         uint64_t min = -2;
375         xqindex i, lru = Noneidx;
376         struct xcache_entry *ce;
377
378         if (cache->flags & XCACHE_LRU_ARRAY){
379                 for (i = 0; i < cache->nr_nodes; i++) {
380                         //XSEGLOG("cache->times[%llu] = %llu", i, cache->times[i]);
381                         if (min > cache->times[i]){
382                                 min = cache->times[i];
383                                 lru = i;
384                         }
385                 }
386                 //FIXME if cache->times[lru] == XCACHE_LRU_MAX
387                 //      lru = NoEntry;
388                 //XSEGLOG("Found lru cache->times[%llu] = %llu", lru, cache->times[lru]);
389         } else if (cache->flags & XCACHE_LRU_HEAP) {
390                 lru = xbinheap_extract(&cache->binheap);
391                 if (lru == NoNode)
392                         return Noneidx;
393                 ce = &cache->nodes[lru];
394                 ce->h = NoNode;
395         }
396         return lru;
397 }
398
399 static int __xcache_evict(struct xcache *cache, xcache_handler h)
400 {
401         //pre_evict
402         //remove from entries
403         //insert to rm_entries
404         //post_evict
405
406         struct xcache_entry *ce;
407         int r;
408         r = __xcache_remove_entries(cache, h);
409         if (r < 0) {
410                 XSEGLOG("Failed to evict %llu from entries", h);
411                 return -1;
412         }
413
414         /*
415         if (NoPendingActions)
416                 return 0;
417         */
418         ce = &cache->nodes[h];
419         if (!cache->ops.post_evict ||
420                         !cache->ops.post_evict(cache->priv, ce->priv))
421                 return 0;
422
423         ce->state = NODE_EVICTED;
424         xlock_acquire(&cache->rm_lock, 1);
425         r = __xcache_insert_rm(cache, h);
426         xlock_release(&cache->rm_lock);
427
428         if (r < 0) {
429                 ce->state = NODE_ACTIVE;
430                 XSEGLOG("BUG: Failed insert %llu to rm_entries", h);
431                 return -1;
432         }
433
434         return 0;
435 }
436
437 static xcache_handler __xcache_evict_lru(struct xcache *cache)
438 {
439         int r;
440         xcache_handler lru;
441
442         lru = __xcache_lru(cache);
443         if (lru == NoEntry){
444                 XSEGLOG("BUG: No lru found");
445                 return NoEntry;
446         }
447
448         r = __xcache_evict(cache, lru);
449         if (r < 0)
450                 return NoEntry;
451         return lru;
452 }
453
454 static int __xcache_remove(struct xcache *cache, xcache_handler h)
455 {
456         return __xcache_remove_entries(cache, h);
457 }
458
459 /*
460  * __xcache_insert is called with cache->lock held and has to hold
461  * cache->rm_lock too when looking/inserting an entry in rm_entries. The
462  * process is the following:
463  *
464  * 1. First, we search in "entries" to check if there was a race. If so, we
465  *    return the appropriate handler.
466  * 2. Then, we search in "rm_entries", to check if the target has been removed.
467  *    If so we update its ref and try to insert it in cache.
468  *
469  * <if any of these steps fail, we return NoEntry. If the node is invalid, we
470  * return DelEntry>
471  *
472  * 3. We now have a valid handler, either the one we started with or the one we
473  *    re-insert. We check if there is space for it in "entries". If no space
474  *    exists, we remove the oldest entry (LRU) and insert the new entry,
475  *
476  * When a valid handler is returned, the associated entry's refcount as well as
477  * access time will always be increased. A re-inserted entry is 'got' two times,
478  * the one to equalize the put that occured after its removal.
479  * FIXME: Not the sanest decision to delete entries *first* from one table
480  * *and then* copy them to other tables. Handle fails properly.
481  */
482 static xcache_handler __xcache_insert(struct xcache *cache, xcache_handler h,
483                                         xcache_handler *lru_handler,
484                                         xcache_handler *reinsert_handler)
485 {
486         int r;
487         struct xcache_entry *ce;
488         xcache_handler tmp_h, lru;
489
490         lru = NoEntry;
491         *lru_handler = NoEntry;
492         *reinsert_handler = NoEntry;
493         ce = &cache->nodes[h];
494
495         /* lookup first to ensure we don't overwrite entries */
496         tmp_h = __xcache_lookup_and_get_entries(cache, ce->name);
497         if (tmp_h != NoEntry)
498                 return tmp_h;
499
500         /* check if our "older self" exists in the rm_entries */
501         xlock_acquire(&cache->rm_lock, 1);
502         tmp_h = __xcache_lookup_rm(cache, ce->name);
503         if (tmp_h != NoEntry) {
504                 r = __xcache_remove_rm(cache, tmp_h);
505                 if (r < 0) {
506                         XSEGLOG("Could not remove found entry (%llu) for %s"
507                                 "from rm_entries", tmp_h, ce->name);
508                         xlock_release(&cache->rm_lock);
509                         return NoEntry;
510                 }
511                 ce = &cache->nodes[tmp_h];
512                 //assert state = NODE_EVICTED
513                 ce->state = NODE_ACTIVE;
514                 /* we need to 'get' entry with rm_lock */
515                 __xcache_entry_get_and_update(cache, tmp_h);
516                 h = tmp_h;
517                 *reinsert_handler = tmp_h;
518         }
519         xlock_release(&cache->rm_lock);
520
521         /* insert new entry to cache */
522         r = __xcache_insert_entries(cache, h);
523         if (r == -XHASH_ENOSPC){
524                 lru = __xcache_evict_lru(cache);
525                 if (UNLIKELY(lru == NoEntry)) {
526                         XSEGLOG("Failed to evict lru entry");
527                         return NoEntry;
528                 }
529                 *lru_handler = lru;
530                 /*
531                  * Cache entry is put when this function returns, without the
532                  * cache lock held.
533                  */
534                 r = __xcache_insert_entries(cache, h);
535                 if (r < 0) {
536                         XSEGLOG("BUG: failed to insert enries after eviction");
537                         return NoEntry;
538                 }
539         }
540         if (r >= 0) {
541                 if (h != tmp_h)
542                         __xcache_entry_get_and_update(cache, h);
543         }
544
545         return ((r < 0) ? NoEntry : h);
546 }
547
548 /*
549  * xcache_insert tries to insert an xcache handler in cache.
550  * On success, it returns either the same xcache handler or another one that is
551  * associated with a cache entry with the same key (name).
552  * If the cache node is marked as deleted, we return DelEntry.
553  * If the insertion fails for any other reason, we return NoEntry.
554  *
555  * Finally, if a successful insertion results to an LRU eviction, we put the
556  * LRU entry.
557  */
558 xcache_handler xcache_insert(struct xcache *cache, xcache_handler h)
559 {
560         struct xcache_entry *ce;
561         xcache_handler ret = NoEntry;
562         xcache_handler lru = NoEntry;
563         xcache_handler reinsert_handler = NoEntry;
564
565         xlock_acquire(&cache->lock, 1);
566         ret = __xcache_insert(cache, h, &lru, &reinsert_handler);
567         xlock_release(&cache->lock);
568
569         if (lru != NoEntry) {
570                 if (UNLIKELY(ret == NoEntry))
571                         XSEGLOG("BUG: Unsuccessful insertion lead to LRU eviction.");
572                 xcache_entry_put(cache, lru);
573                 ce = &cache->nodes[lru];
574                 if (cache->ops.on_evict)
575                         cache->ops.on_evict(cache->priv, ce->priv);
576         }
577
578         if (reinsert_handler != NoEntry) {
579                 if (UNLIKELY(ret == reinsert_handler))
580                         XSEGLOG("BUG: Reinsertion, return another handler");
581                 ce = &cache->nodes[reinsert_handler];
582                 if (cache->ops.on_reinsert)
583                         cache->ops.on_reinsert(cache->priv, ce->priv);
584         }
585
586         return ret;
587 }
588
589 /*
590  * xcache_lookup looks only in "entries". The are several arguments behind this
591  * choice:
592  *
593  * a. Speed: xcache_lookup won't bother with geting rm->lock or re-insert
594  *    entries in other hash tables this way.
595  * b. Common case: Rarely will we ever need to lookup in "rm_entries"
596  * c. Simplicity: <self-explanatory>
597  */
598 xcache_handler xcache_lookup(struct xcache *cache, char *name)
599 {
600         xcache_handler h = NoEntry;
601         xlock_acquire(&cache->lock, 1);
602         h = __xcache_lookup_and_get_entries(cache, name);
603         xlock_release(&cache->lock);
604         return h;
605 }
606
607 xcache_handler xcache_alloc_init(struct xcache *cache, char *name)
608 {
609         int r;
610         xcache_handler h;
611         xqindex idx = alloc_cache_entry(cache);
612         if (idx == Noneidx)
613                 return NoEntry;
614 //      h = (xcache_handle)idx
615         r = xcache_entry_init(cache, idx, name);
616         if (r < 0){
617                 free_cache_entry(cache, idx);
618                 return NoEntry;
619         }
620         h = idx;
621         return h;
622 }
623
624 /*
625  * xcache_init initializes the following:
626  * a. Two xhash tables:
627  *    i. "entries", which indexes the active cache entries.
628  *    ii. "rm_entries", which indexes the removed cache entries that are on the
629  *        process of flushing their dirty data and/or finishing their pending
630  *        requests.
631  * b. The cache nodes. They are typically 2 x cache_size, since we need room for
632  *    the removed cache entries too.
633  * c. The LRU, which is chosen on compile time.
634  */
635 int xcache_init(struct xcache *cache, uint32_t xcache_size,
636                 struct xcache_ops *ops, uint32_t flags, void *priv)
637 {
638         struct xcache_entry *ce;
639         unsigned long i;
640         xhashidx shift;
641         uint32_t tmp_size, floor_size, ceil_size;
642
643         if (!xcache_size)
644                 return -1;
645
646         /* xcache size must be a power of 2.
647          * Enforce it, by choosing the power of two that is closer to the xcache
648          * size requested.
649          */
650         floor_size = 1 << (sizeof(xcache_size) * 8 - __builtin_clz(xcache_size) -1);
651         ceil_size = 1 << (sizeof(xcache_size) * 8 - __builtin_clz(xcache_size));
652
653         if (xcache_size - floor_size < ceil_size - xcache_size)
654                 cache->size = floor_size;
655         else
656                 cache->size = ceil_size;
657
658         if (cache->size != xcache_size)
659                 XSEGLOG("Cache has been resized from %lu entries to %lu entries",
660                                 xcache_size, cache->size);
661
662         /*
663          * Here we choose a proper size for the hash table.
664          * It must be able to contain at least xcache_size elements, before
665          * returns an -EXHASH_RESIZE.
666          * Thus it must be at least 3/2 * xcache_size (OK, the minimum power of
667          * two that meets this requirement to be exact).
668          *
669          * By choosing a xhash size 8 times the minimum required, we drastically
670          * decrease the number or xhash rebuilts required by xhash for
671          * perfomance reasons, sacrificing a logical amount of memory.
672          *
673          */
674
675         tmp_size = 3 * cache->size  / 2;
676         shift = sizeof(tmp_size) * 8 - __builtin_clz(tmp_size);
677         shift += 3;
678
679         xlock_release(&cache->lock);
680         xlock_release(&cache->rm_lock);
681         cache->nr_nodes = cache->size * 2;
682         cache->time = 0;
683         cache->ops = *ops;
684         cache->priv = priv;
685         cache->flags = flags;
686
687         /* TODO: assert cache->nodes isn't UINT64_MAX */
688
689         if (!xq_alloc_seq(&cache->free_nodes, cache->nr_nodes,
690                                 cache->nr_nodes)){
691                 return -1;
692         }
693
694         cache->entries = xhash_new(shift, cache->size, STRING);
695         if (!cache->entries){
696                 goto out_free_q;
697         }
698
699         /*
700          * "rm_entries" must have the same size as "entries" since each one indexes
701          * at most (cache->nodes / 2) entries
702          */
703         cache->rm_entries = xhash_new(shift, cache->size, STRING);
704         if (!cache->rm_entries){
705                 goto out_free_entries;
706         }
707
708         cache->nodes = xtypes_malloc(cache->nr_nodes * sizeof(struct xcache_entry));
709         if (!cache->nodes){
710                 goto out_free_rm_entries;
711         }
712
713         if (flags & XCACHE_LRU_ARRAY){
714                 cache->times = xtypes_malloc(cache->nr_nodes * sizeof(uint64_t));
715                 if (!cache->times){
716                         goto out_free_nodes;
717                 }
718                 for (i = 0; i < cache->nr_nodes; i++) {
719                         cache->times[i] = XCACHE_LRU_MAX; //so lru will never return a this value;
720                 }
721         }
722
723         if (cache->ops.on_node_init){
724                 for (i = 0; i < cache->nr_nodes; i++) {
725                         ce = &cache->nodes[i];
726                         ce->ref = 0;
727                         ce->priv = cache->ops.on_node_init(cache->priv, (void *)&i);
728                         if (!ce->priv)
729                                 goto out_free_times;
730                 }
731         }
732         if (flags & XCACHE_LRU_HEAP){
733                 if (xbinheap_init(&cache->binheap, cache->size, XBINHEAP_MIN,
734                                         NULL) < 0){
735                         goto out_free_times;
736                 }
737         }
738
739         return 0;
740
741 out_free_times:
742         if (flags & XCACHE_LRU_ARRAY)
743                 xtypes_free(cache->times);
744 out_free_nodes:
745         xtypes_free(cache->nodes);
746 out_free_rm_entries:
747         xhash_free(cache->rm_entries);
748 out_free_entries:
749         xhash_free(cache->entries);
750 out_free_q:
751         xq_free(&cache->free_nodes);
752         return -1;
753
754 }
755
756 int xcache_remove(struct xcache *cache, xcache_handler h)
757 {
758         int r;
759         xlock_acquire(&cache->lock, 1);
760         r = __xcache_remove(cache, h);
761         xlock_release(&cache->lock);
762         return r;
763 }
764
765 //This is just an atomic
766 //      lookup
767 //      remove if Found
768 int xcache_invalidate(struct xcache *cache, char *name)
769 {
770         int r = 0;
771         xcache_handler h;
772         xlock_acquire(&cache->lock, 1);
773         xlock_acquire(&cache->rm_lock, 1);
774
775
776         h = __xcache_lookup_entries(cache, name);
777         if (h != NoEntry){
778                 r = __xcache_remove_entries(cache, h);
779                 goto out_put;
780         }
781
782         h = __xcache_lookup_rm(cache, name);
783         if (h != NoEntry){
784                 r = __xcache_remove_rm(cache, h);
785         }
786
787 out:
788         xlock_release(&cache->rm_lock);
789         xlock_release(&cache->lock);
790
791         return r;
792
793 out_put:
794         xlock_release(&cache->rm_lock);
795         xlock_release(&cache->lock);
796
797         if (r >=0)
798                 xcache_put(cache, h);
799         return r;
800
801 }
802
803 /*
804  * xcache_get is called with no locking.
805  * It simply increases the refcount by one. The entry can either be in "entries"
806  * or "rm_entries".
807  */
808 void xcache_get(struct xcache *cache, xcache_handler h)
809 {
810         xqindex idx = (xqindex)h;
811         //xlock_acquire(&cache->lock, 1);
812         __xcache_entry_get(cache, idx);
813         //xlock_release(&cache->lock);
814 }
815
816 /*
817  * xcache_put is called with no locking, but when the refcount of the entry
818  * drops to 0, it takes rm_lock.
819  * The entry can either be in "entries" or "rm_entries".
820  */
821 void xcache_put(struct xcache *cache, xcache_handler h)
822 {
823         xqindex idx = (xqindex)h;
824         //xlock_acquire(&cache->lock, 1);
825         xcache_entry_put(cache, idx);
826         //xlock_release(&cache->lock);
827 }
828
829 /*
830  * xcache_free_new is called with no locking and will not hold any lock too.
831  * It must be called when an entry has not been inserted in cache (e.g. due to
832  * re-insertion) and we want to free it. In this case, the refcount can safely
833  * drop manually to zero
834  */
835 void xcache_free_new(struct xcache *cache, xcache_handler h)
836 {
837         xqindex idx = (xqindex)h;
838         struct xcache_entry *ce = &cache->nodes[idx];
839
840         ce->ref = 0;
841         free_cache_entry(cache, idx);
842 }
843 /*
844  * Put all cache entries.
845  * Does not free cache resources.
846  * xcache_free should be called by the client when all entries are put.
847  */
848 void xcache_close(struct xcache *cache)
849 {
850         uint32_t i;
851         struct xcache_entry *ce;
852         for (i = 0; i < cache->nr_nodes; i++) {
853                 ce = &cache->nodes[i];
854                 if (!ce->ref)
855                         continue;
856                 xcache_invalidate(cache, ce->name);
857                 xcache_entry_put(cache, i);
858         }
859 }
860
861 void xcache_free(struct xcache *cache)
862 {
863         if (cache->flags & XCACHE_LRU_HEAP){
864                 xbinheap_free(&cache->binheap);
865         } else if (cache->flags & XCACHE_LRU_ARRAY){
866                 xtypes_free(cache->times);
867         }
868         xtypes_free(cache->nodes);
869         xhash_free(cache->entries);
870 }
871
872 /*
873  * Return how many free nodes exist.
874  * Hint only, since its racy.
875  */
876 uint64_t xcache_free_nodes(struct xcache *cache)
877 {
878         return (uint64_t)__count_free_nodes(cache);
879 }
880
881 #ifdef __KERNEL__
882 #include <linux/module.h>
883 #include <xtypes/xcache_exports.h>
884 #endif