Skip to content

Commit

Permalink
fix: reduce the locking of methods reading indexes
Browse files Browse the repository at this point in the history
closes: #5973

Signed-off-by: Steve Hawkins <shawkins@redhat.com>
  • Loading branch information
shawkins committed May 1, 2024
1 parent b435843 commit 348e52b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 128 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Fix #5866: Addressed cycle in crd generation with Java 19+ and ZonedDateTime

#### Improvements
* Fix #5973: CacheImpl locking for reading indexes (Cache.byIndex|indexKeys|index) was reduced
* Fix #5878: (java-generator) Add implements Editable for extraAnnotations
* Fix #5878: (java-generator) Update documentation to include dependencies
* Fix #5867: (crd-generator) Imply schemaFrom via JsonFormat shape (SchemaFrom takes precedence)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -37,6 +38,9 @@

/**
* It basically saves and indexes all the entries.
* <br>
* Index reads {@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)}
* are not globally locked and thus may not be fully consistent with the current state
*
* @param <T> type for cache object
*/
Expand All @@ -48,7 +52,6 @@ public class CacheImpl<T extends HasMetadata> implements Cache<T> {
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());

// items stores object instances
// @GuardedBy("getLockObject")
private ItemStore<T> items;

// indices stores objects' key by their indices
Expand All @@ -74,11 +77,13 @@ public void setItemStore(ItemStore<T> items) {
*/
@Override
public Map<String, Function<T, List<String>>> getIndexers() {
return Collections.unmodifiableMap(indexers);
synchronized (indexers) {
return Collections.unmodifiableMap(indexers);
}
}

@Override
public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
public synchronized void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
Set<String> intersection = new HashSet<>(indexers.keySet());
intersection.retainAll(indexersNew.keySet());
if (!intersection.isEmpty()) {
Expand All @@ -96,15 +101,12 @@ public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
* @param obj the object
* @return the old object
*/
public T put(T obj) {
public synchronized T put(T obj) {
if (obj == null) {
return null;
}
String key = getKey(obj);
T oldObj;
synchronized (getLockObject()) {
oldObj = this.items.put(key, obj);
}
T oldObj = this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
return oldObj;
}
Expand All @@ -115,14 +117,11 @@ public T put(T obj) {
* @param obj object
* @return the old object
*/
public T remove(T obj) {
public synchronized T remove(T obj) {
String key = getKey(obj);
T old;
synchronized (getLockObject()) {
old = this.items.remove(key);
}
T old = this.items.remove(key);
if (old != null) {
this.deleteFromIndices(old, key);
this.updateIndices(old, null, key);
}
return old;
}
Expand All @@ -134,9 +133,7 @@ public T remove(T obj) {
*/
@Override
public List<String> listKeys() {
synchronized (getLockObject()) {
return this.items.keySet().collect(Collectors.toList());
}
return this.items.keySet().collect(Collectors.toList());
}

/**
Expand All @@ -156,10 +153,8 @@ public T get(T obj) {
*/
@Override
public String getKey(T obj) {
synchronized (getLockObject()) {
String result = this.items.getKey(obj);
return result == null ? "" : result;
}
String result = this.items.getKey(obj);
return result == null ? "" : result;
}

/**
Expand All @@ -169,9 +164,7 @@ public String getKey(T obj) {
*/
@Override
public List<T> list() {
synchronized (getLockObject()) {
return this.items.values().collect(Collectors.toList());
}
return this.items.values().collect(Collectors.toList());
}

/**
Expand All @@ -182,9 +175,7 @@ public List<T> list() {
*/
@Override
public T getByKey(String key) {
synchronized (getLockObject()) {
return this.items.get(key);
}
return this.items.get(key);
}

/**
Expand All @@ -200,7 +191,7 @@ public List<T> index(String indexName, T obj) {
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Map<String, Set<String>> index = this.indices.get(indexName);
Map<String, Set<String>> index = getIndex(indexName);
if (index.isEmpty()) {
return new ArrayList<>();
}
Expand All @@ -215,23 +206,23 @@ public List<T> index(String indexName, T obj) {
returnKeySet.addAll(set);
}

return getItems(returnKeySet);
}

private List<T> getItems(Set<String> returnKeySet) {
List<T> items = new ArrayList<>(returnKeySet.size());
for (String absoluteKey : returnKeySet) {
T item;
synchronized (getLockObject()) {
item = this.items.get(absoluteKey);
}
if (item != null) {
items.add(item);
}
Optional.ofNullable(this.items.get(absoluteKey)).ifPresent(items::add);
}
return items;
}

private void checkContainsIndex(String indexName) {
if (!this.indexers.containsKey(indexName)) {
private Map<String, Set<String>> getIndex(String indexName) {
Map<String, Set<String>> index = this.indices.get(indexName);
if (index == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
return index;
}

/**
Expand All @@ -243,13 +234,8 @@ private void checkContainsIndex(String indexName) {
*/
@Override
public List<String> indexKeys(String indexName, String indexKey) {
checkContainsIndex(indexName);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index == null) {
return new ArrayList<>();
}
Set<String> set = index.getOrDefault(indexKey, Collections.emptySet());
return new ArrayList<>(set);
Map<String, Set<String>> index = getIndex(indexName);
return new ArrayList<>(index.getOrDefault(indexKey, Collections.emptySet()));
}

/**
Expand All @@ -261,26 +247,8 @@ public List<String> indexKeys(String indexName, String indexKey) {
*/
@Override
public List<T> byIndex(String indexName, String indexKey) {
checkContainsIndex(indexName);
Map<String, Set<String>> index = this.indices.get(indexName);
if (index == null) {
return new ArrayList<>();
}
Set<String> set = index.get(indexKey);
if (set == null) {
return new ArrayList<>();
}
List<T> items = new ArrayList<>(set.size());
for (String key : set) {
T item;
synchronized (getLockObject()) {
item = this.items.get(key);
}
if (item != null) {
items.add(item);
}
}
return items;
Map<String, Set<String>> index = getIndex(indexName);
return getItems(index.getOrDefault(indexKey, Collections.emptySet()));
}

/**
Expand All @@ -292,83 +260,51 @@ public List<T> byIndex(String indexName, String indexKey) {
* @param newObj new object
* @param key the key
*/
void updateIndices(T oldObj, T newObj, String key) {
if (oldObj != null) {
deleteFromIndices(oldObj, key);
}

synchronized (indexers) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Map<String, Set<String>> index = this.indices.get(indexName);
if (index != null) {
updateIndex(key, newObj, indexFunc, index);
private void updateIndices(T oldObj, T newObj, String key) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
String indexName = indexEntry.getKey();
Function<T, List<String>> indexFunc = indexEntry.getValue();
Map<String, Set<String>> index = this.indices.get(indexName);
if (index != null) {
if (oldObj != null) {
updateIndex(key, oldObj, indexFunc, index, true);
}
if (newObj != null) {
updateIndex(key, newObj, indexFunc, index, false);
}
}
}
}

private void updateIndex(String key, T newObj, Function<T, List<String>> indexFunc, Map<String, Set<String>> index) {
List<String> indexValues = indexFunc.apply(newObj);
private void updateIndex(String key, T obj, Function<T, List<String>> indexFunc, Map<String, Set<String>> index,
boolean remove) {
List<String> indexValues = indexFunc.apply(obj);
if (indexValues != null && !indexValues.isEmpty()) {
for (String indexValue : indexValues) {
if (indexValue != null) {
if (remove) {
Optional.ofNullable(index.get(indexValue)).ifPresent(v -> v.remove(key));
} else {
index.computeIfAbsent(indexValue, k -> ConcurrentHashMap.newKeySet()).add(key);
}
}
}
}

/**
* Removes the object from each of the managed indexes.
*
* It is intended to be called from a function that already has a lock on the cache.
*
* @param oldObj the old object
* @param key the key
*/
private void deleteFromIndices(T oldObj, String key) {
synchronized (indexers) {
for (Map.Entry<String, Function<T, List<String>>> indexEntry : this.indexers.entrySet()) {
Function<T, List<String>> indexFunc = indexEntry.getValue();
List<String> indexValues = indexFunc.apply(oldObj);
if (indexValues == null || indexValues.isEmpty()) {
continue;
}

Map<String, Set<String>> index = this.indices.get(indexEntry.getKey());
if (index == null) {
continue;
}
for (String indexValue : indexValues) {
if (indexValue != null) {
Set<String> indexSet = index.get(indexValue);
if (indexSet != null) {
indexSet.remove(key);
}
}
}
}
}
}

/**
* Add index func.
*
* @param indexName the index name
* @param indexFunc the index func
*/
public CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
if (this.indices.containsKey(indexName)) {
throw new IllegalArgumentException("Indexer conflict: " + indexName);
}
ConcurrentMap<String, Set<String>> index = new ConcurrentHashMap<>();
synchronized (indexers) {
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);
this.indices.put(indexName, index);
this.indexers.put(indexName, indexFunc);

synchronized (getLockObject()) {
items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index));
}
}
items.values().forEach(v -> updateIndex(getKey(v), v, indexFunc, index, false));
return this;
}

Expand Down Expand Up @@ -431,19 +367,17 @@ public static List<String> metaNamespaceIndexFunc(Object obj) {
}

@Override
public void removeIndexer(String name) {
public synchronized void removeIndexer(String name) {
this.indices.remove(name);
this.indexers.remove(name);
}

public boolean isFullState() {
synchronized (getLockObject()) {
return items.isFullState();
}
return items.isFullState();
}

public Object getLockObject() {
return this.items;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,15 @@ void parallelStore() throws InterruptedException {
String expectedIndexKey = "pods/test-pod/" + i;
assertThat(cache.getKey(pod)).isEqualTo("test-namespace/test-pod");
cache.put(pod);
assertThat(cache.byIndex(keyIndex, expectedIndexKey)).isNotNull();
assertThat(cache.byIndex(Cache.NAMESPACE_INDEX, "test-namespace"))
.isNotEmpty();
assertThat(cache.byIndex(keyIndex, expectedIndexKey)).isNotEmpty();
while (cache.byIndex(Cache.NAMESPACE_INDEX, "test-namespace").isEmpty()) {
try {
// there's only 1 entry and it's being added by another thread, so it may
// not be visible yet
Thread.sleep(5);
} catch (InterruptedException e) {
}
}
latch.countDown();
})
.forEach(ForkJoinPool.commonPool()::execute);
Expand Down

0 comments on commit 348e52b

Please sign in to comment.