-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17593; [7/N] Introduce CoordinatorExecutor #17823
base: trunk
Are you sure you want to change the base?
Conversation
Please enter the commit message for your changes. Lines starting
@@ -642,6 +664,7 @@ public void testScheduleLoading() { | |||
when(builder.withTimer(any())).thenReturn(builder); | |||
when(builder.withCoordinatorMetrics(any())).thenReturn(builder); | |||
when(builder.withTopicPartition(any())).thenReturn(builder); | |||
when(builder.withExecutor(any())).thenReturn(builder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi, i'll refactor those tests in a separate PR. i'll also so some cleanups.
@smjn fyi. Potentially this is useful for the record deletion work that you're doing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dajac , took a first pass to non-testing files.
if (tasks.containsKey(key)) return false; | ||
|
||
// We use the task as a lock for the key. | ||
tasks.put(key, task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about
if (tasks.containsKey(key)) return false; | |
// We use the task as a lock for the key. | |
tasks.put(key, task); | |
if (tasks.putIfAbsent(key, task) != null) return false; |
// that the task was either replaced or cancelled. We stop. | ||
if (tasks.get(key) != task) return; | ||
|
||
// Executor the task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Executor the task. | |
// Execute the task. |
return operation.onComplete(result.result, result.exception); | ||
} | ||
).exceptionally(exception -> { | ||
if (exception instanceof NotCoordinatorException || exception instanceof CoordinatorLoadInProgressException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we end up here due to a failure on scheduling the write op, without having removed the task? (ln 102). Wondering if we need to remove it here too to clean up.
} | ||
|
||
/** | ||
* Schedule an asynchronous tasks. Note that only one task for a given key can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Schedule an asynchronous tasks. Note that only one task for a given key can | |
* Schedule an asynchronous task. Note that only one task for a given key can |
This patch introduces the
CoordinatorExecutor
construct into theCoordinatorRuntime
. It allows scheduling asynchronous tasks from within aCoordinatorShard
while respecting the runtime semantic. It will be used to asynchronously resolve regular expressions.The
GroupCoordinatorService
uses a defaultExecutorService
with a single thread to back it at the moment. It seems that it should be sufficient. In the future, we could consider making the number of threads configurable.Committer Checklist (excluded from commit message)