Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CacheLongKeyLIRS concurrency improvements #3069

Draft
wants to merge 22 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions h2/src/main/org/h2/engine/DbSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.HashMap;

import org.h2.api.ErrorCode;
import org.h2.util.Utils;
import org.h2.message.DbException;

/**
Expand Down Expand Up @@ -306,6 +307,26 @@ public class DbSettings extends SettingsBase {
*/
public final boolean compressData = get("COMPRESS", false);

/**
* Database setting <code>CACHE_CONCURRENCY</code>
* (default: 16).<br />
* Set the read cache concurrency.
*/
public final int cacheConcurrency = get("CACHE_CONCURRENCY", 16);

/**
* Database setting <code>AUTO_COMMIT_BUFFER_SIZE_KB</code>
* (default: depends on max heap).<br />
* Set the size of the write buffer, in KB disk space (for file-based
* stores). Unless auto-commit is disabled, changes are automatically
* saved if there are more than this amount of changes.
*
* When the value is set to 0 or lower, data is not automatically
* stored.
*/
public final int autoCommitBufferSize = get("AUTO_COMMIT_BUFFER_SIZE_KB",
Math.max(1, Math.min(19, Utils.scaleForAvailableMemory(64))) * 1024);
Comment on lines +310 to +328
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, explain why you need these settings.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use H2 as a cache for binary chunks in high concurrency environment (currently 60 threads). After we increase cacheConcurrency up to 1024 H2 looks really well most of the time (same or even better than postgresql). The only problem arises after we emptied db content - we observed a significant degradation in throughput and cpu usage.
Here's what we run into:

  1. CPU contention in Segment.get (optimization in this PR). Looks like when primary index is small and all 60 threads wants to use it they compete for the exactly same segments. The only way to fix it is to improve LIRS concurrency.
  2. After the fix Incorrect LEFT JOIN with aggrigated query in the join condition #1 write rate increased and next is 'back pressure' feature.
    // if unsaved memory creation rate is to high,
    // some back pressure need to be applied
    // to slow things down and avoid OOME
    Looks like 2 Mb is too small for 150 Mb per sec. This is why we need to introduce AUTO_COMMIT_BUFFER_SIZE_KB.


/**
* Database setting <code>IGNORE_CATALOGS</code>
* (default: false).<br />
Expand Down
131 changes: 112 additions & 19 deletions h2/src/main/org/h2/mvstore/cache/CacheLongKeyLIRS.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.h2.mvstore.cache;

import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -602,6 +604,21 @@ private static class Segment<V> {
*/
private int stackMoveCounter;

/*
* Holds entries that were concurrently get()
*/
private final ConcurrentSkipListSet<Entry<V>> concAccess;

/*
* Serialize access to this segments
*/
private final ReentrantLock l;

/*
* Used as null value for ConcurrentSkipListSet
*/
private final Entry<V> ENTRY_NULL = new Entry<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to convert misses to AtomicLong and avoid a lock for a null value completely. In that case this dummy value and related tricks will not be required.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Will do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and avoid a lock for a null value completely

Actually there is no separate lock for this null value - most of the time processing is done inside lock that is currently active. But i agree that it looks a bit 'artificial'.


/**
* Create a new cache segment.
* @param maxMemory the maximum memory to use
Expand Down Expand Up @@ -632,6 +649,9 @@ private static class Segment<V> {
@SuppressWarnings("unchecked")
Entry<V>[] e = new Entry[len];
entries = e;

concAccess = new ConcurrentSkipListSet<>();
l = new ReentrantLock();
}

/**
Expand Down Expand Up @@ -710,17 +730,42 @@ private void addToMap(Entry<V> e) {
* @param e the entry
* @return the value, or null if there is no resident entry
*/
synchronized V get(Entry<V> e) {
V get(Entry<V> e) {
V value = e == null ? null : e.getValue();
if (value == null) {
// the entry was not found
// or it was a non-resident entry
misses++;
} else {
access(e);
hits++;
if (!l.tryLock()) {
Entry<V> e2 = value == null ? ENTRY_NULL : e;
concAccess.add(e2);
return value;
}
try {
if (value == null) {
// the entry was not found
// or it was a non-resident entry
misses++;
} else {
access(e);
hits++;
}

// process entries that were accessed concurrently
while (true) {
Entry<V> p = concAccess.pollFirst();
if (p == null) {
break;
}
if (p == ENTRY_NULL) {
misses++;
} else {
access(p);
hits++;
}
}

return value;
}
finally {
l.unlock();
}
return value;
}

/**
Expand Down Expand Up @@ -749,9 +794,8 @@ private void access(Entry<V> e) {
V v = e.getValue();
if (v != null) {
removeFromQueue(e);
if (e.reference != null) {
if (e.value == null) {
e.value = v;
e.reference = null;
usedMemory += e.memory;
}
if (e.stackNext != null) {
Expand Down Expand Up @@ -787,7 +831,17 @@ private void access(Entry<V> e) {
* @param memory the memory used for the given entry
* @return the old value, or null if there was no resident entry
*/
synchronized V put(long key, int hash, V value, int memory) {
V put(long key, int hash, V value, int memory) {
l.lock();
try {
return putUnlocked(key, hash, value, memory);
}
finally {
l.unlock();
}
}

private V putUnlocked(long key, int hash, V value, int memory) {
Entry<V> e = find(key, hash);
boolean existed = e != null;
V old = null;
Expand Down Expand Up @@ -832,7 +886,17 @@ synchronized V put(long key, int hash, V value, int memory) {
* @param hash the hash
* @return the old value, or null if there was no resident entry
*/
synchronized V remove(long key, int hash) {
V remove(long key, int hash) {
l.lock();
try {
return removeUnlocked(key, hash);
}
finally {
l.unlock();
}
}

private V removeUnlocked(long key, int hash) {
int index = hash & mask;
Entry<V> e = entries[index];
if (e == null) {
Expand Down Expand Up @@ -897,7 +961,6 @@ private void evictBlock() {
Entry<V> e = queue.queuePrev;
usedMemory -= e.memory;
removeFromQueue(e);
e.reference = new WeakReference<>(e.value);
e.value = null;
addToQueue(queue2, e);
// the size of the non-resident-cold entries needs to be limited
Expand Down Expand Up @@ -1031,7 +1094,17 @@ private void removeFromQueue(Entry<V> e) {
* @param nonResident true for non-resident entries
* @return the key list
*/
synchronized List<Long> keys(boolean cold, boolean nonResident) {
List<Long> keys(boolean cold, boolean nonResident) {
l.lock();
try {
return keysUnlocked(cold, nonResident);
}
finally {
l.unlock();
}
}

private List<Long> keysUnlocked(boolean cold, boolean nonResident) {
ArrayList<Long> keys = new ArrayList<>();
if (cold) {
Entry<V> start = nonResident ? queue2 : queue;
Expand All @@ -1053,7 +1126,17 @@ synchronized List<Long> keys(boolean cold, boolean nonResident) {
*
* @return the set of keys
*/
synchronized Set<Long> keySet() {
Set<Long> keySet() {
l.lock();
try {
return keySetUnlocked();
}
finally {
l.unlock();
}
}

private Set<Long> keySetUnlocked() {
HashSet<Long> set = new HashSet<>();
for (Entry<V> e = stack.stackNext; e != stack; e = e.stackNext) {
set.add(e.key);
Expand Down Expand Up @@ -1086,7 +1169,7 @@ void setMaxMemory(long maxMemory) {
*
* @param <V> the value type
*/
static class Entry<V> {
static class Entry<V> implements Comparable<Entry<V>> {

/**
* The key.
Expand Down Expand Up @@ -1148,10 +1231,15 @@ static class Entry<V> {
this.key = key;
this.memory = memory;
this.value = value;
if (value != null) {
this.reference = new WeakReference<>(value);
}
}

Entry(Entry<V> old) {
this(old.key, old.value, old.memory);
this.key = old.key;
this.memory = old.memory;
this.value = old.value;
this.reference = old.reference;
this.topMove = old.topMove;
}
Expand All @@ -1166,12 +1254,17 @@ boolean isHot() {
}

V getValue() {
return value == null ? reference.get() : value;
final V v = value;
return v == null ? reference.get() : v;
}

int getMemory() {
return value == null ? 0 : memory;
}

public int compareTo(Entry<V> tgt) {
return key == tgt.key ? 0 : key < tgt.key ? -1 : 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override

Long.compare()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions h2/src/main/org/h2/mvstore/db/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public Store(Database db) {
// otherwise background thread would compete for store lock
// with maps opening procedure
builder.autoCommitDisabled();

builder.cacheConcurrency(db.getSettings().cacheConcurrency);
builder.autoCommitBufferSize(db.getSettings().autoCommitBufferSize);
}
this.encrypted = encrypted;
try {
Expand Down