Skip to content

Commit 5f43856

Browse files
authored
Merge pull request #3130 from nnares/issue-3097
issue-3097 : re-writting getAll() impl
2 parents cef93dc + c4ca535 commit 5f43856

11 files changed

Lines changed: 762 additions & 35 deletions

File tree

clustered/ehcache-client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,11 @@ public Map<K, ValueHolder<V>> bulkComputeIfAbsent(final Set<? extends K> keys, f
554554
}
555555
}
556556

557+
@Override
558+
public Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
559+
return bulkComputeIfAbsent((Set<? extends K>) keys,mappingFunction).entrySet();
560+
}
561+
557562
@Override
558563
public List<CacheConfigurationChangeListener> getConfigurationChangeListeners() {
559564
// TODO: Make appropriate ServerStoreProxy call

ehcache-core/src/main/java/org/ehcache/core/spi/store/tiering/AuthoritativeTier.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.ehcache.spi.service.ServiceConfiguration;
2525

2626
import java.util.Collection;
27+
import java.util.Map;
2728
import java.util.function.Function;
2829

2930
/**
@@ -75,7 +76,25 @@ public interface AuthoritativeTier<K, V> extends Store<K, V> {
7576
*/
7677
void setInvalidationValve(InvalidationValve valve);
7778

79+
7880
/**
81+
* Bulk method to compute a value for every key passed in the {@link Iterable} <code>keys</code> argument using the <code>mappingFunction</code>
82+
* to compute the value.
83+
* <p>
84+
* The function takes an {@link Iterable} of {@link java.util.Map.Entry} key/value pairs, where each entry's value is its currently stored value
85+
* for each key that is not mapped in the store. It is expected that the function should return an {@link Iterable} of {@link java.util.Map.Entry}
86+
* key/value pairs containing an entry for each key that was passed to it.
87+
* <p>
88+
* Note: This method guarantees atomicity of computations for each individual key in {@code keys}. Implementations may choose to provide coarser grained atomicity.
89+
*
90+
* @param keys the keys to compute a new value for, if they're not in the store.
91+
* @param mappingFunction the function that generates new values.
92+
* @return a {@code Map} of key/value pairs for each key in <code>keys</code> to the previously missing value.
93+
* @throws StoreAccessException when a failure occurs when accessing the store
94+
*/
95+
Iterable<? extends Map.Entry<? extends K,? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K,? extends V>>> mappingFunction) throws StoreAccessException;
96+
97+
/**
7998
* Invalidation valve, that is the mechanism through which an {@link AuthoritativeTier} can request invalidations
8099
* from the {@link CachingTier}.
81100
*/

ehcache-core/src/main/java/org/ehcache/core/spi/store/tiering/CachingTier.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.ehcache.spi.service.ServiceConfiguration;
2626

2727
import java.util.Collection;
28+
import java.util.Map;
2829
import java.util.Set;
2930
import java.util.function.Function;
3031

@@ -107,6 +108,22 @@ public interface CachingTier<K, V> extends ConfigurationChangeSupport {
107108
*/
108109
void setInvalidationListener(InvalidationListener<K, V> invalidationListener);
109110

111+
/**
112+
* Bulk method which takes {@link Set} of <code>keys</code> as argument and returns a {@link Map} of its mapped value from CachingTier,
113+
* For all the missing entries from CachingTier using <code>mappingFunction</code> to compute its value
114+
* <p>
115+
* The function takes an {@link Iterable} of missing keys, where each entry's mapping is missing from CachingTier.
116+
* It is expected that the function should return an {@link Iterable} of {@link java.util.Map.Entry} key/value pairs containing an entry for each key that was passed to it.
117+
* <p>
118+
* Note: This method guarantees atomicity of computations for each individual key in {@code keys}. Implementations may choose to provide coarser grained atomicity.
119+
*
120+
* @param keys the keys to compute a new value for, if they're not in the store.
121+
* @param mappingFunction the function that generates new values.
122+
* @return a {@code Map} of key/value pairs for each key in <code>keys</code>.
123+
* @throws StoreAccessException when a failure occurs when accessing the store.
124+
*/
125+
Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends Store.ValueHolder<V>>>> mappingFunction) throws StoreAccessException;
126+
110127
/**
111128
* Caching tier invalidation listener.
112129
* <p>

ehcache-impl/src/main/java/org/ehcache/impl/internal/store/basic/NopStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,13 @@ public Map<K, ValueHolder<V>> bulkComputeIfAbsent(Set<? extends K> keys, Functio
199199
}
200200
return map;
201201
}
202+
203+
@Override
204+
public Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
205+
Map<K, ValueHolder<V>> map = new HashMap<>();
206+
for(K key : keys) {
207+
map.put(key, null);
208+
}
209+
return map.entrySet();
210+
}
202211
}

ehcache-impl/src/main/java/org/ehcache/impl/internal/store/heap/OnHeapStore.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@
9292
import java.util.function.Consumer;
9393
import java.util.function.Function;
9494
import java.util.function.Supplier;
95+
import java.util.stream.Collectors;
96+
import java.util.stream.StreamSupport;
9597

9698
import static org.ehcache.config.Eviction.noAdvice;
9799
import static org.ehcache.core.config.ExpiryUtils.isExpiryDurationInfinite;
@@ -956,6 +958,40 @@ public void setInvalidationListener(InvalidationListener<K, V> providedInvalidat
956958
};
957959
}
958960

961+
@Override
962+
public Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Entry<? extends K, ? extends ValueHolder<V>>>> mappingFunction) throws StoreAccessException {
963+
Map<K, ValueHolder<V>> result = new HashMap<>();
964+
Set<K> missingKeys = new HashSet<>();
965+
966+
for (K key : keys) {
967+
ValueHolder<V> cachingFetch = get(key);
968+
if (null == cachingFetch) {
969+
missingKeys.add(key);
970+
} else {
971+
result.put(key, cachingFetch);
972+
}
973+
}
974+
975+
try {
976+
List<? extends Entry<? extends K, ? extends ValueHolder<V>>> fetchedEntries =
977+
StreamSupport.stream(mappingFunction.apply(missingKeys).spliterator(), false)
978+
.filter(e -> missingKeys.contains(e.getKey()))
979+
.collect(Collectors.toList());
980+
981+
long availableSize = capacity - result.size();
982+
for (Entry<? extends K, ? extends ValueHolder<V>> entry : fetchedEntries) {
983+
// populating AuthoritativeTier entries to TieredStore for getAll()
984+
if(availableSize-- > 0){
985+
getOrComputeIfAbsent(entry.getKey(), keyParam -> entry.getValue());
986+
}
987+
result.put(entry.getKey(), entry.getValue());
988+
}
989+
return result;
990+
} catch (RuntimeException re) {
991+
throw new StoreAccessException(re);
992+
}
993+
}
994+
959995
@Override
960996
public void invalidateAllWithHash(long hash) {
961997
invalidateAllWithHashObserver.begin();

ehcache-impl/src/main/java/org/ehcache/impl/internal/store/offheap/AbstractOffHeapStore.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,25 @@ public ValueHolder<V> getAndFault(K key) throws StoreAccessException {
846846
return mappedValue;
847847
}
848848

849+
public Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>> bulkComputeIfAbsentAndFault(Iterable<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
850+
Map<K, ValueHolder<V>> result = new HashMap<>();
851+
for (K key : keys) {
852+
checkKey(key);
853+
Function<K, V> function = k -> {
854+
java.util.Iterator<? extends Map.Entry<? extends K, ? extends V>> iterator = mappingFunction.apply(Collections.singleton(k)).iterator();
855+
Map.Entry<? extends K, ? extends V> result1 = iterator.next();
856+
if (result1 != null) {
857+
checkKey(result1.getKey());
858+
return result1.getValue();
859+
} else {
860+
return null;
861+
}
862+
};
863+
result.put(key, computeIfAbsentAndFault(key, function));
864+
}
865+
return result.entrySet();
866+
}
867+
849868
@Override
850869
public ValueHolder<V> computeIfAbsentAndFault(K key, Function<? super K, ? extends V> mappingFunction) throws StoreAccessException {
851870
return internalComputeIfAbsent(key, mappingFunction, true, true);

ehcache-impl/src/main/java/org/ehcache/impl/internal/store/tiering/CompoundCachingTier.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.ArrayList;
3737
import java.util.Collection;
3838
import java.util.EnumSet;
39+
import java.util.HashMap;
40+
import java.util.HashSet;
3941
import java.util.List;
4042
import java.util.Map;
4143
import java.util.Set;
@@ -200,6 +202,38 @@ public void setInvalidationListener(InvalidationListener<K, V> invalidationListe
200202
lower.setInvalidationListener(invalidationListener);
201203
}
202204

205+
@Override
206+
public Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends Store.ValueHolder<V>>>> mappingFunction) throws StoreAccessException {
207+
try {
208+
return higher.bulkGetOrComputeIfAbsent(keys, keyParam -> {
209+
try {
210+
Map<K, Store.ValueHolder<V>> result = new HashMap<>();
211+
Set<K> missingKeys = new HashSet<>();
212+
213+
for (K key : keys) {
214+
Store.ValueHolder<V> cachingFetch = lower.getAndRemove(key);
215+
if (null == cachingFetch) {
216+
missingKeys.add(key);
217+
} else {
218+
result.put(key, cachingFetch);
219+
}
220+
}
221+
222+
Iterable<? extends Map.Entry<? extends K, ? extends Store.ValueHolder<V>>> fetchedEntries = mappingFunction.apply(missingKeys);
223+
for (Map.Entry<? extends K, ? extends Store.ValueHolder<V>> entry : fetchedEntries) {
224+
result.put(entry.getKey(), entry.getValue());
225+
}
226+
return result.entrySet();
227+
228+
} catch (StoreAccessException cae) {
229+
throw new ComputationException(cae);
230+
}
231+
});
232+
} catch (ComputationException ce) {
233+
throw ce.getStoreAccessException();
234+
}
235+
}
236+
203237
@Override
204238
public List<CacheConfigurationChangeListener> getConfigurationChangeListeners() {
205239
List<CacheConfigurationChangeListener> listeners = new ArrayList<>();

ehcache-impl/src/main/java/org/ehcache/impl/internal/store/tiering/TieredStore.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.util.ArrayList;
3838
import java.util.Arrays;
3939
import java.util.Collection;
40+
import java.util.Collections;
41+
import java.util.HashMap;
4042
import java.util.HashSet;
4143
import java.util.List;
4244
import java.util.Map;
@@ -356,13 +358,18 @@ public Map<K, ValueHolder<V>> bulkCompute(Set<? extends K> keys, Function<Iterab
356358
}
357359

358360
@Override
359-
public Map<K, ValueHolder<V>> bulkComputeIfAbsent(Set<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
361+
public Map<K, Store.ValueHolder<V>> bulkComputeIfAbsent(Set<? extends K> keys, Function<Iterable<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends V>>> mappingFunction) throws StoreAccessException {
362+
360363
try {
361-
return authoritativeTier.bulkComputeIfAbsent(keys, mappingFunction);
362-
} finally {
363-
for (K key : keys) {
364-
cachingTier().invalidate(key);
365-
}
364+
return cachingTier().bulkGetOrComputeIfAbsent(keys, missingKeys -> {
365+
try {
366+
return authoritativeTier.bulkComputeIfAbsentAndFault(missingKeys, mappingFunction);
367+
} catch (StoreAccessException cae) {
368+
throw new StorePassThroughException(cae);
369+
}
370+
});
371+
} catch (StoreAccessException ce) {
372+
return handleStoreAccessException(ce);
366373
}
367374
}
368375

@@ -379,7 +386,7 @@ private CachingTier<K, V> cachingTier() {
379386
return cachingTierRef.get();
380387
}
381388

382-
private ValueHolder<V> handleStoreAccessException(StoreAccessException ce) throws StoreAccessException {
389+
private <R> R handleStoreAccessException(StoreAccessException ce) throws StoreAccessException {
383390
Throwable cause = ce.getCause();
384391
if (cause instanceof StorePassThroughException) {
385392
throw (StoreAccessException) cause.getCause();
@@ -604,6 +611,15 @@ public void setInvalidationListener(final InvalidationListener<K, V> invalidatio
604611
// noop
605612
}
606613

614+
@Override
615+
public Map<K, Store.ValueHolder<V>> bulkGetOrComputeIfAbsent(Iterable<? extends K> keys, Function<Set<? extends K>, Iterable<? extends Map.Entry<? extends K, ? extends ValueHolder<V>>>> mappingFunction) throws StoreAccessException {
616+
Map<K, ValueHolder<V>> map = new HashMap<>();
617+
for(K key : keys) {
618+
map.put(key, null);
619+
}
620+
return map;
621+
}
622+
607623
@Override
608624
public void invalidateAllWithHash(long hash) {
609625
// noop

0 commit comments

Comments
 (0)