-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CacheLongKeyLIRS concurrency improvements #3069
base: master
Are you sure you want to change the base?
Changes from 15 commits
fea847c
a81f372
3b9dc9b
f342c4a
0e9df84
0a5fedb
a3a4988
af0457b
3a11c81
90d0135
d233a18
69deaba
7656f24
17ede53
0f30f6a
8b261e8
8f49a13
4711416
3ec21cd
a31529c
95970d7
beccc44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,10 @@ | |
package org.h2.mvstore.cache; | ||
|
||
import java.lang.ref.WeakReference; | ||
import java.util.concurrent.ConcurrentSkipListSet; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Supplier; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
|
@@ -161,10 +165,9 @@ public V put(long key, V value, int memory) { | |
// check whether resize is required: synchronize on s, to avoid | ||
// concurrent resizes (concurrent reads read | ||
// from the old segment) | ||
synchronized (s) { | ||
s = resizeIfNeeded(s, segmentIndex); | ||
return s.put(key, hash, value, memory); | ||
} | ||
return s.withLock(() -> { | ||
return resizeIfNeeded(s, segmentIndex).put(key, hash, value, memory); | ||
}); | ||
} | ||
|
||
private Segment<V> resizeIfNeeded(Segment<V> s, int segmentIndex) { | ||
|
@@ -208,10 +211,9 @@ public V remove(long key) { | |
// check whether resize is required: synchronize on s, to avoid | ||
// concurrent resizes (concurrent reads read | ||
// from the old segment) | ||
synchronized (s) { | ||
s = resizeIfNeeded(s, segmentIndex); | ||
return s.remove(key, hash); | ||
} | ||
return s.withLock(() -> { | ||
return resizeIfNeeded(s, segmentIndex).remove(key, hash); | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -312,7 +314,7 @@ public long getMaxMemory() { | |
* | ||
* @return the entry set | ||
*/ | ||
public synchronized Set<Map.Entry<Long, V>> entrySet() { | ||
public Set<Map.Entry<Long, V>> entrySet() { | ||
return getMap().entrySet(); | ||
} | ||
|
||
|
@@ -389,7 +391,7 @@ public long getHits() { | |
public long getMisses() { | ||
int x = 0; | ||
for (Segment<V> s : segments) { | ||
x += s.misses; | ||
x += s.misses.get(); | ||
} | ||
return x; | ||
} | ||
|
@@ -491,9 +493,10 @@ public void putAll(Map<Long, ? extends V> m) { | |
*/ | ||
public void trimNonResidentQueue() { | ||
for (Segment<V> s : segments) { | ||
synchronized (s) { | ||
s.withLock(() -> { | ||
s.trimNonResidentQueue(); | ||
} | ||
return null; | ||
}); | ||
} | ||
} | ||
|
||
|
@@ -527,7 +530,7 @@ private static class Segment<V> { | |
/** | ||
* The number of cache misses. | ||
*/ | ||
long misses; | ||
final AtomicLong misses; | ||
|
||
/** | ||
* The map array. The size is always a power of 2. | ||
|
@@ -602,6 +605,16 @@ private static class Segment<V> { | |
*/ | ||
private int stackMoveCounter; | ||
|
||
/* | ||
* Holds entries that were concurrently get() | ||
*/ | ||
private final ConcurrentSkipListSet<Entry<V>> concAccess; | ||
|
||
/* | ||
* Serialize access to this segments | ||
*/ | ||
private final ReentrantLock l; | ||
|
||
/** | ||
* Create a new cache segment. | ||
* @param maxMemory the maximum memory to use | ||
|
@@ -632,6 +645,10 @@ private static class Segment<V> { | |
@SuppressWarnings("unchecked") | ||
Entry<V>[] e = new Entry[len]; | ||
entries = e; | ||
|
||
misses = new AtomicLong(); | ||
concAccess = new ConcurrentSkipListSet<>(); | ||
l = new ReentrantLock(); | ||
} | ||
|
||
/** | ||
|
@@ -646,7 +663,7 @@ private static class Segment<V> { | |
this(old.maxMemory, old.stackMoveDistance, len, | ||
old.nonResidentQueueSize, old.nonResidentQueueSizeHigh); | ||
hits = old.hits; | ||
misses = old.misses; | ||
misses.set(old.misses.get()); | ||
Entry<V> s = old.stack.stackPrev; | ||
while (s != old.stack) { | ||
Entry<V> e = new Entry<>(s); | ||
|
@@ -710,17 +727,41 @@ private void addToMap(Entry<V> e) { | |
* @param e the entry | ||
* @return the value, or null if there is no resident entry | ||
*/ | ||
synchronized V get(Entry<V> e) { | ||
V get(Entry<V> e) { | ||
V value = e == null ? null : e.getValue(); | ||
if (value == null) { | ||
// the entry was not found | ||
// or it was a non-resident entry | ||
misses++; | ||
} else { | ||
access(e); | ||
hits++; | ||
if (!l.tryLock()) { | ||
if (value == null) { | ||
misses.incrementAndGet(); | ||
} else { | ||
concAccess.add(e); | ||
} | ||
return value; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When |
||
try { | ||
if (value == null) { | ||
// the entry was not found | ||
// or it was a non-resident entry | ||
misses.incrementAndGet(); | ||
} else { | ||
access(e); | ||
hits++; | ||
} | ||
|
||
// process entries that were accessed concurrently | ||
while (true) { | ||
Entry<V> p = concAccess.pollFirst(); | ||
if (p == null) { | ||
break; | ||
} | ||
access(p); | ||
hits++; | ||
} | ||
|
||
return value; | ||
} | ||
finally { | ||
l.unlock(); | ||
} | ||
return value; | ||
} | ||
|
||
/** | ||
|
@@ -749,9 +790,8 @@ private void access(Entry<V> e) { | |
V v = e.getValue(); | ||
if (v != null) { | ||
removeFromQueue(e); | ||
if (e.reference != null) { | ||
if (e.value == null) { | ||
e.value = v; | ||
e.reference = null; | ||
usedMemory += e.memory; | ||
} | ||
if (e.stackNext != null) { | ||
|
@@ -787,7 +827,13 @@ private void access(Entry<V> e) { | |
* @param memory the memory used for the given entry | ||
* @return the old value, or null if there was no resident entry | ||
*/ | ||
synchronized V put(long key, int hash, V value, int memory) { | ||
V put(long key, int hash, V value, int memory) { | ||
return withLock(() -> { | ||
return putUnlocked(key, hash, value, memory); | ||
}); | ||
} | ||
|
||
private V putUnlocked(long key, int hash, V value, int memory) { | ||
Entry<V> e = find(key, hash); | ||
boolean existed = e != null; | ||
V old = null; | ||
|
@@ -832,7 +878,13 @@ synchronized V put(long key, int hash, V value, int memory) { | |
* @param hash the hash | ||
* @return the old value, or null if there was no resident entry | ||
*/ | ||
synchronized V remove(long key, int hash) { | ||
V remove(long key, int hash) { | ||
return withLock(() -> { | ||
return removeUnlocked(key, hash); | ||
}); | ||
} | ||
|
||
private V removeUnlocked(long key, int hash) { | ||
int index = hash & mask; | ||
Entry<V> e = entries[index]; | ||
if (e == null) { | ||
|
@@ -897,7 +949,6 @@ private void evictBlock() { | |
Entry<V> e = queue.queuePrev; | ||
usedMemory -= e.memory; | ||
removeFromQueue(e); | ||
e.reference = new WeakReference<>(e.value); | ||
e.value = null; | ||
addToQueue(queue2, e); | ||
// the size of the non-resident-cold entries needs to be limited | ||
|
@@ -1031,7 +1082,13 @@ private void removeFromQueue(Entry<V> e) { | |
* @param nonResident true for non-resident entries | ||
* @return the key list | ||
*/ | ||
synchronized List<Long> keys(boolean cold, boolean nonResident) { | ||
List<Long> keys(boolean cold, boolean nonResident) { | ||
return withLock(() -> { | ||
return keysUnlocked(cold, nonResident); | ||
}); | ||
} | ||
|
||
private List<Long> keysUnlocked(boolean cold, boolean nonResident) { | ||
ArrayList<Long> keys = new ArrayList<>(); | ||
if (cold) { | ||
Entry<V> start = nonResident ? queue2 : queue; | ||
|
@@ -1053,7 +1110,13 @@ synchronized List<Long> keys(boolean cold, boolean nonResident) { | |
* | ||
* @return the set of keys | ||
*/ | ||
synchronized Set<Long> keySet() { | ||
Set<Long> keySet() { | ||
return withLock(() -> { | ||
return keySetUnlocked(); | ||
}); | ||
} | ||
|
||
private Set<Long> keySetUnlocked() { | ||
HashSet<Long> set = new HashSet<>(); | ||
for (Entry<V> e = stack.stackNext; e != stack; e = e.stackNext) { | ||
set.add(e.key); | ||
|
@@ -1074,7 +1137,17 @@ synchronized Set<Long> keySet() { | |
void setMaxMemory(long maxMemory) { | ||
this.maxMemory = maxMemory; | ||
} | ||
|
||
|
||
|
||
<T> T withLock(Supplier<T> c) { | ||
l.lock(); | ||
try { | ||
return c.get(); | ||
} | ||
finally { | ||
l.unlock(); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From my point of view this method doesn't make code more readable and introduction of lambda functions can make it slower. Usually it isn't critical, but it looks strange when you're trying to improve performance. |
||
} | ||
|
||
/** | ||
|
@@ -1086,7 +1159,7 @@ void setMaxMemory(long maxMemory) { | |
* | ||
* @param <V> the value type | ||
*/ | ||
static class Entry<V> { | ||
static class Entry<V> implements Comparable<Entry<V>> { | ||
|
||
/** | ||
* The key. | ||
|
@@ -1148,10 +1221,15 @@ static class Entry<V> { | |
this.key = key; | ||
this.memory = memory; | ||
this.value = value; | ||
if (value != null) { | ||
this.reference = new WeakReference<>(value); | ||
} | ||
} | ||
|
||
Entry(Entry<V> old) { | ||
this(old.key, old.value, old.memory); | ||
this.key = old.key; | ||
this.memory = old.memory; | ||
this.value = old.value; | ||
this.reference = old.reference; | ||
this.topMove = old.topMove; | ||
} | ||
|
@@ -1166,12 +1244,18 @@ boolean isHot() { | |
} | ||
|
||
V getValue() { | ||
return value == null ? reference.get() : value; | ||
final V v = value; | ||
return v == null ? reference.get() : v; | ||
} | ||
|
||
int getMemory() { | ||
return value == null ? 0 : memory; | ||
} | ||
|
||
@Override | ||
public int compareTo(Entry<V> tgt) { | ||
return Long.compare(this.key, tgt.key); | ||
} | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, explain why you need these settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use H2 as a cache for binary chunks in high concurrency environment (currently 60 threads). After we increase cacheConcurrency up to 1024 H2 looks really well most of the time (same or even better than postgresql). The only problem arises after we emptied db content - we observed a significant degradation in throughput and cpu usage.
Here's what we run into:
// if unsaved memory creation rate is to high,
// some back pressure need to be applied
// to slow things down and avoid OOME
Looks like 2 Mb is too small for 150 Mb per sec. This is why we need to introduce AUTO_COMMIT_BUFFER_SIZE_KB.