Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package org.ehcache.spi.loaderwriter;

import org.ehcache.spi.service.Service;

import java.util.function.Consumer;
/**
* A {@link Service} that provides write-behind functionality.
* <p>
* A {@code CacheManager} will use the {@link #createWriteBehindLoaderWriter(org.ehcache.spi.loaderwriter.CacheLoaderWriter, org.ehcache.spi.loaderwriter.WriteBehindConfiguration)}
* A {@code CacheManager} will use the {Consumer, @link #createWriteBehindLoaderWriter(org.ehcache.spi.loaderwriter.CacheLoaderWriter, org.ehcache.spi.loaderwriter.WriteBehindConfiguration)}
* method to create write-behind instances for each {@code Cache} it manages
* that carries a write-behind configuration.
*/
Expand All @@ -30,14 +30,15 @@ public interface WriteBehindProvider extends Service {
* Creates write-behind decorated {@link CacheLoaderWriter} according to the
* given configuration.
*
* @param keyCleanUpMethod cleanup Method to clean failures
* @param cacheLoaderWriter the {@code CacheLoaderWriter} to decorate
* @param configuration the write-behind configuration
* @param <K> the key type for the loader writer
* @param <V> the value type for the loader writer
*
* @return the write-behind decorated loader writer
*/
<K, V> CacheLoaderWriter<K, V> createWriteBehindLoaderWriter(CacheLoaderWriter<K, V> cacheLoaderWriter, WriteBehindConfiguration<?> configuration);
<K, V> CacheLoaderWriter<K, V> createWriteBehindLoaderWriter(Consumer<K> keyCleanUpMethod, CacheLoaderWriter<K, V> cacheLoaderWriter, WriteBehindConfiguration<?> configuration);

/**
* Releases a write-behind decorator when the associated {@link org.ehcache.Cache Cache}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.ehcache.impl.internal.loaderwriter.writebehind.operations.SingleOperation;
import org.ehcache.impl.internal.loaderwriter.writebehind.operations.WriteOperation;
import org.ehcache.impl.internal.loaderwriter.writebehind.operations.WriteAllOperation;
import org.ehcache.spi.loaderwriter.BulkCacheWritingException;
import org.ehcache.spi.loaderwriter.CacheLoaderWriter;
import org.ehcache.spi.loaderwriter.WriteBehindConfiguration;
import org.ehcache.spi.loaderwriter.WriteBehindConfiguration.BatchingConfiguration;
Expand All @@ -45,6 +46,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.ehcache.impl.internal.executor.ExecutorUtil.shutdown;
Expand Down Expand Up @@ -72,8 +74,8 @@ public class BatchingLocalHeapWriteBehindQueue<K, V> extends AbstractWriteBehind
private final boolean coalescing;

private volatile Batch openBatch;

public BatchingLocalHeapWriteBehindQueue(ExecutionService executionService, String defaultThreadPool, WriteBehindConfiguration<?> config, CacheLoaderWriter<K, V> cacheLoaderWriter) {
private final Consumer<K> keyCleanUpMethod;
public BatchingLocalHeapWriteBehindQueue(Consumer<K> keyCleanUpMethod, ExecutionService executionService, String defaultThreadPool, WriteBehindConfiguration<?> config, CacheLoaderWriter<K, V> cacheLoaderWriter) {
super(cacheLoaderWriter);
this.cacheLoaderWriter = cacheLoaderWriter;
BatchingConfiguration batchingConfig = config.getBatchingConfiguration();
Expand All @@ -91,6 +93,7 @@ public BatchingLocalHeapWriteBehindQueue(ExecutionService executionService, Stri
} else {
this.scheduledExecutor = executionService.getScheduledExecutor(config.getThreadPoolAlias());
}
this.keyCleanUpMethod = keyCleanUpMethod;
}

@Override
Expand Down Expand Up @@ -188,25 +191,30 @@ public boolean add(SingleOperation<K, V> operation) {
}

@Override
@SuppressWarnings("unchecked")
public void run() {
try {
List<BatchOperation<K, V>> batches = createMonomorphicBatches(operations());
// execute the batch operations
for (BatchOperation<K, V> batch : batches) {
List<BatchOperation<K, V>> batches = createMonomorphicBatches(operations());
// execute the batch operations
for (BatchOperation<K, V> batch : batches) {
try {
try {
batch.performOperation(cacheLoaderWriter);
} catch (Exception e) {
LOGGER.warn("Exception while bulk processing in write behind queue", e);
} finally {
try {
for (K key : batch.getKeys()) {
latest.remove(key);
}
} finally {
LOGGER.debug("Cancelling batch expiry task");
expireTask.cancel(false);
}
}
}
} finally {
try {
for (SingleOperation<K, V> op : operations()) {
latest.remove(op.getKey(), op);
catch (Exception ex) {
for (K key : batch.getKeys()) {
keyCleanUpMethod.accept(key);
}
} finally {
LOGGER.debug("Cancelling batch expiry task");
expireTask.cancel(false);
LOGGER.warn("Exception while bulk processing in write behind queue", ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

import static org.ehcache.impl.internal.executor.ExecutorUtil.shutdown;

Expand All @@ -43,8 +44,9 @@ public class NonBatchingLocalHeapWriteBehindQueue<K, V> extends AbstractWriteBeh
private final ConcurrentMap<K, SingleOperation<K, V>> latest = new ConcurrentHashMap<>();
private final BlockingQueue<Runnable> executorQueue;
private final ExecutorService executor;
private final Consumer<K> keyCleanUpMethod;

public NonBatchingLocalHeapWriteBehindQueue(ExecutionService executionService, String defaultThreadPool, WriteBehindConfiguration<?> config, CacheLoaderWriter<K, V> cacheLoaderWriter) {
public NonBatchingLocalHeapWriteBehindQueue(Consumer<K> keyCleanUpMethod, ExecutionService executionService, String defaultThreadPool, WriteBehindConfiguration<?> config, CacheLoaderWriter<K, V> cacheLoaderWriter) {
super(cacheLoaderWriter);
this.cacheLoaderWriter = cacheLoaderWriter;
this.executorQueue = new LinkedBlockingQueue<>(config.getMaxQueueSize());
Expand All @@ -53,25 +55,29 @@ public NonBatchingLocalHeapWriteBehindQueue(ExecutionService executionService, S
} else {
this.executor = executionService.getOrderedExecutor(config.getThreadPoolAlias(), executorQueue);
}
this.keyCleanUpMethod = keyCleanUpMethod;
}

@Override
protected SingleOperation<K, V> getOperation(K key) {

return latest.get(key);
}

@Override
@SuppressWarnings("unchecked")
protected void addOperation(final SingleOperation<K, V> operation) {
latest.put(operation.getKey(), operation);

submit(() -> {
try {
operation.performOperation(cacheLoaderWriter);
try {
operation.performOperation(cacheLoaderWriter);
} finally {
latest.remove(operation.getKey(), operation);
}
} catch (Exception e) {
keyCleanUpMethod.accept(operation.getKey());
LOGGER.warn("Exception while processing key '{}' write behind queue : {}", operation.getKey(), e);
} finally {
latest.remove(operation.getKey(), operation);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

import org.ehcache.spi.loaderwriter.CacheLoaderWriter;
import org.ehcache.spi.loaderwriter.WriteBehindConfiguration;
Expand All @@ -35,13 +36,13 @@ public class StripedWriteBehind<K, V> implements WriteBehind<K, V> {

private final List<WriteBehind<K, V>> stripes = new ArrayList<>();

public StripedWriteBehind(ExecutionService executionService, String defaultThreadPool, WriteBehindConfiguration<?> config, CacheLoaderWriter<K, V> cacheLoaderWriter) {
public StripedWriteBehind(Consumer<K> keyCleanUpMethod, ExecutionService executionService, String defaultThreadPool, WriteBehindConfiguration<?> config, CacheLoaderWriter<K, V> cacheLoaderWriter) {
int writeBehindConcurrency = config.getConcurrency();
for (int i = 0; i < writeBehindConcurrency; i++) {
if (config.getBatchingConfiguration() == null) {
this.stripes.add(new NonBatchingLocalHeapWriteBehindQueue<>(executionService, defaultThreadPool, config, cacheLoaderWriter));
this.stripes.add(new NonBatchingLocalHeapWriteBehindQueue<>(keyCleanUpMethod, executionService, defaultThreadPool, config, cacheLoaderWriter));
} else {
this.stripes.add(new BatchingLocalHeapWriteBehindQueue<>(executionService, defaultThreadPool, config, cacheLoaderWriter));
this.stripes.add(new BatchingLocalHeapWriteBehindQueue<>(keyCleanUpMethod, executionService, defaultThreadPool, config, cacheLoaderWriter));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.ehcache.spi.service.ServiceProvider;
import org.osgi.service.component.annotations.Component;

import java.util.function.Consumer;

/**
* @author Abhilash
*
Expand Down Expand Up @@ -71,11 +73,11 @@ public void start(ServiceProvider<Service> serviceProvider) {
}

@Override
public <K, V> WriteBehind<K, V> createWriteBehindLoaderWriter(CacheLoaderWriter<K, V> cacheLoaderWriter, WriteBehindConfiguration<?> configuration) {
public <K, V> WriteBehind<K, V> createWriteBehindLoaderWriter(Consumer<K> keyCleanUpMethod, CacheLoaderWriter<K, V> cacheLoaderWriter, WriteBehindConfiguration<?> configuration) {
if (cacheLoaderWriter == null) {
throw new NullPointerException("WriteBehind requires a non null CacheLoaderWriter.");
}
return new StripedWriteBehind<>(executionService, threadPoolAlias, configuration, cacheLoaderWriter);
return new StripedWriteBehind<>(keyCleanUpMethod, executionService, threadPoolAlias, configuration, cacheLoaderWriter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public interface BatchOperation<K, V> {
*/
void performOperation(CacheLoaderWriter<K, V> cacheLoaderWriter) throws BulkCacheWritingException, Exception;

/**
* Return set of keys from batch entries
*
*/
Iterable<? extends K> getKeys();

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,9 @@ public void performOperation(CacheLoaderWriter<K, V> cacheLoaderWriter) throws B
cacheLoaderWriter.deleteAll(entries);
}

@Override
public Iterable<? extends K> getKeys() {
return entries;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package org.ehcache.impl.internal.loaderwriter.writebehind.operations;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.ehcache.spi.loaderwriter.BulkCacheWritingException;
import org.ehcache.spi.loaderwriter.CacheLoaderWriter;
Expand Down Expand Up @@ -43,4 +45,13 @@ public void performOperation(CacheLoaderWriter<K, V> cacheLoaderWriter) throws B
cacheLoaderWriter.writeAll(entries);
}

@Override
public Set<K> getKeys() {
Set<K> keys = new HashSet<>();
for (Map.Entry<? extends K, ? extends V> entry : entries) {
keys.add(entry.getKey());
}
return keys;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.ehcache.spi.loaderwriter.CacheLoaderWriterProvider;
import org.ehcache.spi.loaderwriter.WriteBehindConfiguration;
import org.ehcache.spi.loaderwriter.WriteBehindProvider;
import org.ehcache.spi.resilience.StoreAccessException;
import org.ehcache.spi.service.Service;
import org.ehcache.spi.service.ServiceConfiguration;
import org.ehcache.spi.service.ServiceDependencies;
Expand All @@ -39,13 +40,20 @@ public class LoaderWriterStoreProvider extends AbstractWrapperStoreProvider {
private volatile WriteBehindProvider writeBehindProvider;

@Override
@SuppressWarnings("unchecked")
protected <K, V> Store<K, V> wrap(Store<K, V> store, Store.Configuration<K, V> storeConfig, ServiceConfiguration<?, ?>... serviceConfigs) {
WriteBehindConfiguration<?> writeBehindConfiguration = findSingletonAmongst(WriteBehindConfiguration.class, (Object[]) serviceConfigs);
LocalLoaderWriterStore<K, V> loaderWriterStore;
if(writeBehindConfiguration == null) {
loaderWriterStore = new LocalLoaderWriterStore<>(store, storeConfig.getCacheLoaderWriter(), storeConfig.useLoaderInAtomics(), storeConfig.getExpiry());
} else {
CacheLoaderWriter<? super K, V> writeBehindLoaderWriter = writeBehindProvider.createWriteBehindLoaderWriter(storeConfig.getCacheLoaderWriter(), writeBehindConfiguration);
CacheLoaderWriter<? super K, V> writeBehindLoaderWriter = writeBehindProvider.createWriteBehindLoaderWriter( key-> {
try {
store.remove((K) key);
} catch (StoreAccessException ex) {
throw new RuntimeException(ex);
}
}, storeConfig.getCacheLoaderWriter(), writeBehindConfiguration);
loaderWriterStore = new LocalWriteBehindLoaderWriterStore<>(store, writeBehindLoaderWriter, storeConfig.useLoaderInAtomics(), storeConfig.getExpiry());
}
return loaderWriterStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import org.ehcache.Cache;
import org.ehcache.CacheManager;
Expand Down Expand Up @@ -535,8 +536,8 @@ class TestWriteBehindProvider extends WriteBehindProviderFactory.Provider {

@Override
@SuppressWarnings("unchecked")
public <K, V> WriteBehind<K, V> createWriteBehindLoaderWriter(CacheLoaderWriter<K, V> cacheLoaderWriter, WriteBehindConfiguration<?> configuration) {
this.writeBehind = super.createWriteBehindLoaderWriter(cacheLoaderWriter, configuration);
public <K, V> WriteBehind<K, V> createWriteBehindLoaderWriter(Consumer<K> keyCleanUpMethod, CacheLoaderWriter<K, V> cacheLoaderWriter, WriteBehindConfiguration<?> configuration) {
this.writeBehind = super.createWriteBehindLoaderWriter(keyCleanUpMethod, cacheLoaderWriter, configuration);
return (WriteBehind<K, V>) writeBehind;
}

Expand Down
Loading