Skip to content

Conversation

@dajac
Copy link
Member

@dajac dajac commented Jan 23, 2026

This patch refactors the coordinator runtime to improve modularity by
decoupling the timer and executor components:

  • Extract EventBasedCoordinatorTimer from CoordinatorRuntime into
    standalone CoordinatorTimerImpl;
  • Introduce CoordinatorShardScheduler interface to allow shard-scoped
    components (timer, executor) to schedule write operations back to the
    runtime within their shard's scope;
  • Simplify CoordinatorExecutorImpl by replacing direct runtime
    dependency with the new scheduler interface;

Reviewers: Sean Quah squah@confluent.io, Chia-Ping Tsai
chia7712@gmail.com

dajac added 4 commits January 23, 2026 18:29
- Replace CoordinatorRuntime and TopicPartition dependencies with a
  functional Scheduler interface
- Remove unnecessary generic type parameter S
- Use var for local variable declarations
- Simplify tests by mocking the Scheduler instead of CoordinatorRuntime
This patch extracts EventBasedCoordinatorTimer from CoordinatorRuntime
into a standalone CoordinatorTimerImpl class, following the same pattern
as CoordinatorExecutorImpl. The new class uses a Scheduler functional
interface to decouple from CoordinatorRuntime internals.

This also adds comprehensive unit tests for CoordinatorTimerImpl.
Extract the identical WriteOperation and Scheduler interfaces from
CoordinatorExecutorImpl and CoordinatorTimerImpl into a shared
CoordinatorShardScheduler interface. This provides a common internal
API for shard-scoped components to schedule write operations through
the coordinator runtime.
Reorder constructor parameters to place the scheduler last, matching
the parameter order in CoordinatorTimerImpl.
@dajac
Copy link
Member Author

dajac commented Jan 23, 2026

@squah-confluent Could you please review?

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac thanks for this great refactor

@Override
public void schedule(
String key,
long delay,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using a Duration to replace both delay and unit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not considered it. It will consider it separately and raise a PR if it makes sense to do it.

};

log.debug("Registering timer {} with delay of {}ms.", key, unit.toMillis(delay));
var prevTask = tasks.put(key, task);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var prevTask = tasks.put(key, task);
if (!tasks.remove(key, this))

Is it possible that two different threads change the tasks concurrently after this decoupling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the threading model is still the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the threading model is still the same.

If scheduler.scheduleWriteOperation returns a failed CompletableFuture, the exceptionally block will be executed by the timer thread. Does it break the threading model?

// Execute the timeout operation.
return operation.generateRecords();
}
).exceptionally(ex -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if scheduleWriteOperation returns a failed future? Will the task not be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. We should remove it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

log.debug("Scheduling write event {} for timer {}.", event.name, key);
try {
enqueueLast(event);
} catch (NotCoordinatorException ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exceptions that come out of scheduleWriteOperation/enqueueLast are very confusing. I think it would help if we were a bit more thorough with the @throwss in the javadocs, not necessarily in this PR.

I think the RejectedExecutionException handler within the exceptionally above will never be hit because scheduleWriteOperation will throw it directly? seems that both the future can complete with RejectedExecutionException and scheduleWriteOperation can throw it directly.

This patch fixes two issues in CoordinatorTimerImpl:

1. Synchronous exceptions from scheduleWriteOperation were not caught,
   leaving tasks in the map. Fixed by wrapping the scheduler lambda
   with try-catch in CoordinatorRuntime.CoordinatorContext to convert
   exceptions to failed futures.

2. Tasks were not removed in the exceptionally handler when the write
   operation failed. Fixed by adding defensive task cleanup as the
   first step in the exceptionally handler, following the same pattern
   as CoordinatorExecutorImpl.
@dajac
Copy link
Member Author

dajac commented Jan 24, 2026

The exceptions that come out of scheduleWriteOperation/enqueueLast are very confusing. I think it would help if we were a bit more thorough with the @throwss in the javadocs, not necessarily in this PR.

Yes, I agree.

I think the RejectedExecutionException handler within the exceptionally above will never be hit because scheduleWriteOperation will throw it directly? seems that both the future can complete with RejectedExecutionException and scheduleWriteOperation can throw it directly.

scheduleWriteOperation should not throw RejectedExecutionException but the timer and the executor use it internally. The part which annoys me is that scheduleWriteOperation can throw synchronously and fail the future. It would be better to only use one pattern. I will try to improve this separately too.

Comment on lines +486 to +494
try {
return scheduleWriteOperation(
operationName,
tp,
coordinator -> operation.generate()
);
} catch (Throwable t) {
return CompletableFuture.failedFuture(t);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handle synchronous exceptions here so components can only work with the future. It is simpler this way.

Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor!

@dajac dajac requested a review from chia7712 January 24, 2026 11:57
@chia7712
Copy link
Member

scheduleWriteOperation should not throw RejectedExecutionException but the timer and the executor use it internally. The part which annoys me is that scheduleWriteOperation can throw synchronously and fail the future. It would be better to only use one pattern. I will try to improve this separately too.

agreed. It would be cool to handle both in one fell swoop. The method returning CompletableFuture should NOT throw exception directly

@chia7712 chia7712 merged commit 5f0e368 into apache:trunk Jan 26, 2026
26 checks passed
@dajac dajac deleted the minor-timer-cleanup branch January 26, 2026 09:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants