Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ protected List<SubscriptionEvent> generateSubscriptionEvents() throws Exception

final List<SubscriptionEvent> events = new ArrayList<>();
final List<File> tsFiles = batch.sealTsFiles();
final AtomicInteger referenceCount = new AtomicInteger(tsFiles.size());
final AtomicInteger ackReferenceCount = new AtomicInteger(tsFiles.size());
final AtomicInteger cleanReferenceCount = new AtomicInteger(tsFiles.size());
for (final File tsFile : tsFiles) {
final SubscriptionCommitContext commitContext =
prefetchingQueue.generateSubscriptionCommitContext();
events.add(
new SubscriptionEvent(
new SubscriptionPipeTsFileBatchEvents(this, referenceCount), tsFile, commitContext));
new SubscriptionPipeTsFileBatchEvents(this, ackReferenceCount, cleanReferenceCount),
tsFile,
commitContext));
}
return events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,30 @@
public class SubscriptionPipeTsFileBatchEvents implements SubscriptionPipeEvents {

private final SubscriptionPipeTsFileEventBatch batch;
private final AtomicInteger referenceCount; // shared between the same batch
private final AtomicInteger ackReferenceCount; // shared between the same batch
private final AtomicInteger cleanReferenceCount; // shared between the same batch
private final int count; // snapshot the initial reference count, used for event count calculation

public SubscriptionPipeTsFileBatchEvents(
final SubscriptionPipeTsFileEventBatch batch, final AtomicInteger referenceCount) {
final SubscriptionPipeTsFileEventBatch batch,
final AtomicInteger ackReferenceCount,
final AtomicInteger cleanReferenceCount) {
this.batch = batch;
this.referenceCount = referenceCount;
this.count = Math.max(1, referenceCount.get());
this.ackReferenceCount = ackReferenceCount;
this.cleanReferenceCount = cleanReferenceCount;
this.count = Math.max(1, ackReferenceCount.get());
}

@Override
public void ack() {
if (referenceCount.decrementAndGet() == 0) {
if (ackReferenceCount.decrementAndGet() == 0) {
batch.ack();
}
}

@Override
public void cleanUp(final boolean force) {
if (referenceCount.decrementAndGet() == 0) {
if (cleanReferenceCount.decrementAndGet() == 0) {
batch.cleanUp(force);
}
}
Expand Down
Loading