-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CacheLongKeyLIRS concurrency improvements #3069
base: master
Are you sure you want to change the base?
Changes from 6 commits
fea847c
a81f372
3b9dc9b
f342c4a
0e9df84
0a5fedb
a3a4988
af0457b
3a11c81
90d0135
d233a18
69deaba
7656f24
17ede53
0f30f6a
8b261e8
8f49a13
4711416
3ec21cd
a31529c
95970d7
beccc44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,8 @@ | |
package org.h2.mvstore.cache; | ||
|
||
import java.lang.ref.WeakReference; | ||
import java.util.concurrent.ConcurrentSkipListSet; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
|
@@ -602,6 +604,21 @@ private static class Segment<V> { | |
*/ | ||
private int stackMoveCounter; | ||
|
||
/* | ||
* Holds entries that were concurrently get() | ||
*/ | ||
private final ConcurrentSkipListSet<Entry<V>> concAccess; | ||
|
||
/* | ||
* Serialize access to this segments | ||
*/ | ||
private final ReentrantLock l; | ||
|
||
/* | ||
* Used as null value for ConcurrentSkipListSet | ||
*/ | ||
private final Entry<V> ENTRY_NULL = new Entry<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be better to convert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. Will do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Actually there is no separate lock for this null value - most of the time processing is done inside lock that is currently active. But i agree that it looks a bit 'artificial'. |
||
|
||
/** | ||
* Create a new cache segment. | ||
* @param maxMemory the maximum memory to use | ||
|
@@ -632,6 +649,9 @@ private static class Segment<V> { | |
@SuppressWarnings("unchecked") | ||
Entry<V>[] e = new Entry[len]; | ||
entries = e; | ||
|
||
concAccess = new ConcurrentSkipListSet<>(); | ||
l = new ReentrantLock(); | ||
} | ||
|
||
/** | ||
|
@@ -710,17 +730,42 @@ private void addToMap(Entry<V> e) { | |
* @param e the entry | ||
* @return the value, or null if there is no resident entry | ||
*/ | ||
synchronized V get(Entry<V> e) { | ||
V get(Entry<V> e) { | ||
V value = e == null ? null : e.getValue(); | ||
if (value == null) { | ||
// the entry was not found | ||
// or it was a non-resident entry | ||
misses++; | ||
} else { | ||
access(e); | ||
hits++; | ||
if (!l.tryLock()) { | ||
Entry<V> e2 = value == null ? ENTRY_NULL : e; | ||
concAccess.add(e2); | ||
return value; | ||
} | ||
try { | ||
if (value == null) { | ||
// the entry was not found | ||
// or it was a non-resident entry | ||
misses++; | ||
} else { | ||
access(e); | ||
hits++; | ||
} | ||
|
||
// process entries that were accessed concurrently | ||
while (true) { | ||
Entry<V> p = concAccess.pollFirst(); | ||
if (p == null) { | ||
break; | ||
} | ||
if (p == ENTRY_NULL) { | ||
misses++; | ||
} else { | ||
access(p); | ||
hits++; | ||
} | ||
} | ||
|
||
return value; | ||
} | ||
finally { | ||
l.unlock(); | ||
} | ||
return value; | ||
} | ||
|
||
/** | ||
|
@@ -749,9 +794,8 @@ private void access(Entry<V> e) { | |
V v = e.getValue(); | ||
if (v != null) { | ||
removeFromQueue(e); | ||
if (e.reference != null) { | ||
if (e.value == null) { | ||
e.value = v; | ||
e.reference = null; | ||
usedMemory += e.memory; | ||
} | ||
if (e.stackNext != null) { | ||
|
@@ -787,7 +831,17 @@ private void access(Entry<V> e) { | |
* @param memory the memory used for the given entry | ||
* @return the old value, or null if there was no resident entry | ||
*/ | ||
synchronized V put(long key, int hash, V value, int memory) { | ||
V put(long key, int hash, V value, int memory) { | ||
l.lock(); | ||
try { | ||
return putUnlocked(key, hash, value, memory); | ||
} | ||
finally { | ||
l.unlock(); | ||
} | ||
} | ||
|
||
private V putUnlocked(long key, int hash, V value, int memory) { | ||
Entry<V> e = find(key, hash); | ||
boolean existed = e != null; | ||
V old = null; | ||
|
@@ -832,7 +886,17 @@ synchronized V put(long key, int hash, V value, int memory) { | |
* @param hash the hash | ||
* @return the old value, or null if there was no resident entry | ||
*/ | ||
synchronized V remove(long key, int hash) { | ||
V remove(long key, int hash) { | ||
l.lock(); | ||
try { | ||
return removeUnlocked(key, hash); | ||
} | ||
finally { | ||
l.unlock(); | ||
} | ||
} | ||
|
||
private V removeUnlocked(long key, int hash) { | ||
int index = hash & mask; | ||
Entry<V> e = entries[index]; | ||
if (e == null) { | ||
|
@@ -897,7 +961,6 @@ private void evictBlock() { | |
Entry<V> e = queue.queuePrev; | ||
usedMemory -= e.memory; | ||
removeFromQueue(e); | ||
e.reference = new WeakReference<>(e.value); | ||
e.value = null; | ||
addToQueue(queue2, e); | ||
// the size of the non-resident-cold entries needs to be limited | ||
|
@@ -1031,7 +1094,17 @@ private void removeFromQueue(Entry<V> e) { | |
* @param nonResident true for non-resident entries | ||
* @return the key list | ||
*/ | ||
synchronized List<Long> keys(boolean cold, boolean nonResident) { | ||
List<Long> keys(boolean cold, boolean nonResident) { | ||
l.lock(); | ||
try { | ||
return keysUnlocked(cold, nonResident); | ||
} | ||
finally { | ||
l.unlock(); | ||
} | ||
} | ||
|
||
private List<Long> keysUnlocked(boolean cold, boolean nonResident) { | ||
ArrayList<Long> keys = new ArrayList<>(); | ||
if (cold) { | ||
Entry<V> start = nonResident ? queue2 : queue; | ||
|
@@ -1053,7 +1126,17 @@ synchronized List<Long> keys(boolean cold, boolean nonResident) { | |
* | ||
* @return the set of keys | ||
*/ | ||
synchronized Set<Long> keySet() { | ||
Set<Long> keySet() { | ||
l.lock(); | ||
try { | ||
return keySetUnlocked(); | ||
} | ||
finally { | ||
l.unlock(); | ||
} | ||
} | ||
|
||
private Set<Long> keySetUnlocked() { | ||
HashSet<Long> set = new HashSet<>(); | ||
for (Entry<V> e = stack.stackNext; e != stack; e = e.stackNext) { | ||
set.add(e.key); | ||
|
@@ -1086,7 +1169,7 @@ void setMaxMemory(long maxMemory) { | |
* | ||
* @param <V> the value type | ||
*/ | ||
static class Entry<V> { | ||
static class Entry<V> implements Comparable<Entry<V>> { | ||
|
||
/** | ||
* The key. | ||
|
@@ -1148,10 +1231,15 @@ static class Entry<V> { | |
this.key = key; | ||
this.memory = memory; | ||
this.value = value; | ||
if (value != null) { | ||
this.reference = new WeakReference<>(value); | ||
} | ||
} | ||
|
||
Entry(Entry<V> old) { | ||
this(old.key, old.value, old.memory); | ||
this.key = old.key; | ||
this.memory = old.memory; | ||
this.value = old.value; | ||
this.reference = old.reference; | ||
this.topMove = old.topMove; | ||
} | ||
|
@@ -1166,12 +1254,17 @@ boolean isHot() { | |
} | ||
|
||
V getValue() { | ||
return value == null ? reference.get() : value; | ||
final V v = value; | ||
return v == null ? reference.get() : v; | ||
} | ||
|
||
int getMemory() { | ||
return value == null ? 0 : memory; | ||
} | ||
|
||
public int compareTo(Entry<V> tgt) { | ||
return key == tgt.key ? 0 : key < tgt.key ? -1 : 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. |
||
} | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, explain why you need these settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use H2 as a cache for binary chunks in high concurrency environment (currently 60 threads). After we increase cacheConcurrency up to 1024 H2 looks really well most of the time (same or even better than postgresql). The only problem arises after we emptied db content - we observed a significant degradation in throughput and cpu usage.
Here's what we run into:
// if unsaved memory creation rate is to high,
// some back pressure need to be applied
// to slow things down and avoid OOME
Looks like 2 Mb is too small for 150 Mb per sec. This is why we need to introduce AUTO_COMMIT_BUFFER_SIZE_KB.