Skip to content

Commit 71449aa

Browse files
authored
KAFKA-20106: Ensure reconciled assignment updated within poll (#21495)
Fix to ensure that reconciled assignments are only updated in the subscription state with a call to consumer.poll. Before this PR, assignment changes occurred in the background thread when async operations commit/callback completed : (1)commit → (2)revoke callback → (3)assignment update → (4)assign callback), potentially causing IllegalStateException when applications called seek/position on consumer.assignment(). With this PR, we piggyback the assignment update on the existing mechanism that triggers the onPartitionsAssigned callback (consolidate steps 3 and 4 mentioned above, in a single one). Replace CallbackNeededEvent with a new PartitionsAssignedEvent sent to the app thread after every reconciliation. This event performs both assignment update and onPartitionsAssigned callback (if needed). This ensures assignment changes happen within poll() for all cases (commit or not, callbacks or not). The fix applies to the KafkaConsumer only. The ShareConsumer behaviour remains unchanged with this PR (performs assignment update in the background) Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, David Jacot <djacot@confluent.io>
1 parent 5af84cf commit 71449aa

File tree

12 files changed

+388
-85
lines changed

12 files changed

+388
-85
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -505,20 +505,6 @@ private void clearAssignment() {
505505
clearPendingAssignmentsAndLocalNamesCache();
506506
}
507507

508-
/**
509-
* Update a new assignment by setting the assigned partitions in the member subscription.
510-
* This will mark the newly added partitions as pending callback, to prevent fetching records
511-
* or updating positions for them while the callback runs.
512-
*
513-
* @param assignedPartitions Full assignment, to update in the subscription state
514-
* @param addedPartitions Newly added partitions
515-
*/
516-
private void updateSubscriptionAwaitingCallback(TopicIdPartitionSet assignedPartitions,
517-
SortedSet<TopicPartition> addedPartitions) {
518-
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(), addedPartitions);
519-
notifyAssignmentChange(assignedPartitions.topicPartitions());
520-
}
521-
522508
/**
523509
* Transition to the {@link MemberState#JOINING} state, indicating that the member will
524510
* try to join the group on the next heartbeat request. This is expected to be invoked when
@@ -1192,12 +1178,11 @@ private CompletableFuture<Void> assignPartitions(
11921178
TopicIdPartitionSet assignedPartitions,
11931179
SortedSet<TopicPartition> addedPartitions) {
11941180

1195-
// Update assignment in the subscription state, and ensure that no fetching or positions
1196-
// initialization happens for the newly added partitions while the callback runs.
1197-
updateSubscriptionAwaitingCallback(assignedPartitions, addedPartitions);
1181+
// Signal that new partitions have been reconciled so that type-specific actions can be taken.
1182+
// - ShareMembershipManager: updates subscription immediately and returns completed future
1183+
// - ConsumerMembershipManager: enqueues event for app thread to apply assignment within poll() and run callbacks
1184+
CompletableFuture<Void> result = signalPartitionsAssigned(assignedPartitions, addedPartitions);
11981185

1199-
// Invoke user call back.
1200-
CompletableFuture<Void> result = signalPartitionsAssigned(addedPartitions);
12011186
// Enable newly added partitions to start fetching and updating positions for them.
12021187
result.whenComplete((__, exception) -> {
12031188
if (exception == null) {
@@ -1230,10 +1215,12 @@ private CompletableFuture<Void> assignPartitions(
12301215
/**
12311216
* Signals to the membership manager that partitions are being assigned so that actions
12321217
* specific to the group type can be taken.
1218+
*
1219+
* @param assignedPartitions The full assignment to apply
1220+
* @param addedPartitions The newly added partitions (used for callback and subscription update)
12331221
*/
1234-
public CompletableFuture<Void> signalPartitionsAssigned(Set<TopicPartition> partitionsAssigned) {
1235-
return CompletableFuture.completedFuture(null);
1236-
}
1222+
protected abstract CompletableFuture<Void> signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
1223+
SortedSet<TopicPartition> addedPartitions);
12371224

12381225
/**
12391226
* Signals to the membership manager that partitions are being revoked so that actions

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
4040
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
4141
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
42+
import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
4243
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
4344
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
4445
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
@@ -51,14 +52,15 @@
5152
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
5253
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
5354
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
54-
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
5555
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
5656
import org.apache.kafka.clients.consumer.internals.events.CurrentLagEvent;
5757
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
5858
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
5959
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
6060
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
6161
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
62+
import org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
63+
import org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
6264
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
6365
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
6466
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
@@ -143,6 +145,7 @@
143145

144146
import static java.util.Objects.requireNonNull;
145147
import static org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
148+
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
146149
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
147150
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
148151
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
@@ -194,8 +197,12 @@ public void process(final BackgroundEvent event) {
194197
process((ErrorEvent) event);
195198
break;
196199

197-
case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:
198-
process((ConsumerRebalanceListenerCallbackNeededEvent) event);
200+
case PARTITIONS_ASSIGNED:
201+
process((PartitionsAssignedEvent) event);
202+
break;
203+
204+
case PARTITIONS_REMOVED:
205+
process((PartitionsRemovedEvent) event);
199206
break;
200207

201208
case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
@@ -220,12 +227,51 @@ private void process(final ErrorEvent event) {
220227
throw event.error();
221228
}
222229

223-
private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
230+
/**
231+
* Processing this event will perform the actions needed in the app thread when new partitions are reconciled in the background:
232+
* - apply assignment changes (ensuring they happen in the background but triggered within the app thread poll)
233+
* - run onPartitionsAssigned callback if present
234+
* - notify background thread so it can carry on (e.g., send ack to the broker)
235+
*/
236+
private void process(final PartitionsAssignedEvent event) {
237+
238+
applyNewAssignment(event);
239+
240+
if (subscriptions.rebalanceListener().isEmpty()) {
241+
event.future().complete(null);
242+
} else {
243+
invokeRebalanceCallbackAndNotifyBackgroundThread(ON_PARTITIONS_ASSIGNED, event.addedPartitions(), event.future());
244+
}
245+
}
246+
247+
/**
248+
* Send event to the background to update the assignment in the subscription state.
249+
* Block on it to complete to ensure the assignment change happens within a call to
250+
* consumer.poll.
251+
* Note that this event only happens when there is a pending assignment (reconciliation
252+
* completed in the background)
253+
*/
254+
private void applyNewAssignment(final PartitionsAssignedEvent event) {
255+
ApplyAssignmentEvent applyEvent = new ApplyAssignmentEvent(
256+
event.assignedPartitions(),
257+
event.addedPartitions()
258+
);
259+
applicationEventHandler.addAndGet(applyEvent);
260+
}
261+
262+
private void process(final PartitionsRemovedEvent event) {
263+
invokeRebalanceCallbackAndNotifyBackgroundThread(event.methodName(), event.partitions(), event.future());
264+
}
265+
266+
private void invokeRebalanceCallbackAndNotifyBackgroundThread(
267+
ConsumerRebalanceListenerMethodName methodName,
268+
SortedSet<TopicPartition> partitions,
269+
CompletableFuture<Void> future) {
224270
ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks(
225271
rebalanceListenerInvoker,
226-
event.methodName(),
227-
event.partitions(),
228-
event.future()
272+
methodName,
273+
partitions,
274+
future
229275
);
230276
applicationEventHandler.add(invokedEvent);
231277
if (invokedEvent.error().isPresent()) {
@@ -2252,9 +2298,9 @@ boolean processBackgroundEvents() {
22522298
* {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} callback needs to be invoked for any
22532299
* partitions the consumer owns. However,
22542300
* this callback must be executed on the application thread. To achieve this, the background thread enqueues a
2255-
* {@link ConsumerRebalanceListenerCallbackNeededEvent} on its background event queue. That event queue is
2301+
* {@link PartitionsRemovedEvent} on its background event queue. That event queue is
22562302
* periodically queried by the application thread to see if there's work to be done. When the application thread
2257-
* sees {@link ConsumerRebalanceListenerCallbackNeededEvent}, it is processed, and then a
2303+
* sees {@link PartitionsRemovedEvent}, it is processed, and then a
22582304
* {@link ConsumerRebalanceListenerCallbackCompletedEvent} is then enqueued by the application thread on the
22592305
* application event queue. Moments later, the background thread will see that event, process it, and continue
22602306
* execution of the rebalancing logic. The rebalancing logic cannot complete until the

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import org.apache.kafka.clients.consumer.CloseOptions;
2020
import org.apache.kafka.clients.consumer.ConsumerConfig;
2121
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
22+
import org.apache.kafka.clients.consumer.internals.events.ApplyAssignmentEvent;
2223
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
2324
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
2425
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
25-
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
2626
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
27+
import org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
28+
import org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
2729
import org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
2830
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
2931
import org.apache.kafka.common.KafkaException;
@@ -48,7 +50,6 @@
4850
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
4951
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
5052
import static org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
51-
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
5253
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
5354
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
5455

@@ -134,7 +135,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager<Consume
134135

135136
/**
136137
* Serves as the conduit by which we can report events to the application thread. This is needed as we send
137-
* {@link ConsumerRebalanceListenerCallbackNeededEvent callbacks} and, if needed,
138+
* {@link PartitionsAssignedEvent}, {@link PartitionsRemovedEvent} and, if needed,
138139
* {@link ErrorEvent errors} to the application thread.
139140
*/
140141
private final BackgroundEventHandler backgroundEventHandler;
@@ -360,17 +361,6 @@ private CompletableFuture<Void> invokeOnPartitionsRevokedCallback(Set<TopicParti
360361
}
361362
}
362363

363-
private CompletableFuture<Void> invokeOnPartitionsAssignedCallback(Set<TopicPartition> partitionsAssigned) {
364-
// This should always trigger the callback, even if partitionsAssigned is empty, to keep
365-
// the current behaviour.
366-
Optional<ConsumerRebalanceListener> listener = subscriptions.rebalanceListener();
367-
if (listener.isPresent()) {
368-
return enqueueConsumerRebalanceListenerCallback(ON_PARTITIONS_ASSIGNED, partitionsAssigned);
369-
} else {
370-
return CompletableFuture.completedFuture(null);
371-
}
372-
}
373-
374364
private CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartition> partitionsLost) {
375365
// This should not trigger the callback if partitionsLost is empty, to keep the current
376366
// behaviour.
@@ -386,8 +376,12 @@ private CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartitio
386376
* {@inheritDoc}
387377
*/
388378
@Override
389-
public CompletableFuture<Void> signalPartitionsAssigned(Set<TopicPartition> partitionsAssigned) {
390-
return invokeOnPartitionsAssignedCallback(partitionsAssigned);
379+
protected CompletableFuture<Void> signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
380+
SortedSet<TopicPartition> addedPartitions) {
381+
// Send an event to notify the app thread that the assignment changed with new partitions.
382+
// The app thread is expected to trigger the assignment update (within a call to poll),
383+
// and to run the onPartitionsAssigned callback if needed.
384+
return enqueuePartitionsAssignedEvent(assignedPartitions.topicPartitions(), addedPartitions);
391385
}
392386

393387
/**
@@ -440,16 +434,16 @@ public boolean isLeavingGroup() {
440434
}
441435

442436
/**
443-
* Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to trigger the execution of the
444-
* appropriate {@link ConsumerRebalanceListener} {@link ConsumerRebalanceListenerMethodName method} on the
445-
* application thread.
437+
* Enqueue a {@link PartitionsRemovedEvent} to trigger the execution of either
438+
* {@link ConsumerRebalanceListener#onPartitionsRevoked} or {@link ConsumerRebalanceListener#onPartitionsLost}
439+
* on the application thread.
446440
*
447441
* <p/>
448442
*
449443
* Because the reconciliation process (run in the background thread) will be blocked by the application thread
450444
* until it completes this, we need to provide a {@link CompletableFuture} by which to remember where we left off.
451445
*
452-
* @param methodName Callback method that needs to be executed on the application thread
446+
* @param methodName Callback method that needs to be executed (ON_PARTITIONS_REVOKED or ON_PARTITIONS_LOST)
453447
* @param partitions Partitions to supply to the callback method
454448
* @return Future that will be chained within the rest of the reconciliation logic
455449
*/
@@ -458,12 +452,29 @@ private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(Consume
458452
SortedSet<TopicPartition> sortedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
459453
sortedPartitions.addAll(partitions);
460454

461-
CompletableBackgroundEvent<Void> event = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, sortedPartitions);
455+
CompletableBackgroundEvent<Void> event = new PartitionsRemovedEvent(methodName, sortedPartitions);
462456
backgroundEventHandler.add(event);
463457
log.debug("The event to trigger the {} method execution was enqueued successfully", methodName.fullyQualifiedMethodName());
464458
return event.future();
465459
}
466460

461+
/**
462+
* Enqueue a {@link PartitionsAssignedEvent} to the application thread.
463+
* This event handles the assignment update and optional onPartitionsAssigned callback.
464+
*
465+
* @param fullAssignment The full assignment to apply
466+
* @param addedPartitions The newly added partitions (passed to the callback)
467+
* @return Future that will be chained within the rest of the reconciliation logic
468+
*/
469+
private CompletableFuture<Void> enqueuePartitionsAssignedEvent(Set<TopicPartition> fullAssignment,
470+
SortedSet<TopicPartition> addedPartitions) {
471+
CompletableBackgroundEvent<Void> event = new PartitionsAssignedEvent(fullAssignment, addedPartitions);
472+
backgroundEventHandler.add(event);
473+
log.debug("The event to update the new assignment and trigger onPartitionsAssigned callback if needed " +
474+
"has been enqueued successfully to be sent to the app thread.");
475+
return event.future();
476+
}
477+
467478
/**
468479
* Signals that a {@link ConsumerRebalanceListener} callback has completed. This is invoked when the
469480
* application thread has completed the callback and has submitted a
@@ -497,6 +508,21 @@ public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListener
497508
}
498509
}
499510

511+
/**
512+
* Apply the assignment update to the subscription state. This is called from the background
513+
* thread when processing an {@link ApplyAssignmentEvent} that was triggered by the application
514+
* thread during poll. This ensures that the assignment update happens on the background thread
515+
* but is coordinated by the application thread, so consumer.assignment() only changes within
516+
* a call to consumer.poll().
517+
*
518+
* @param assignedPartitions The full assignment to apply
519+
* @param addedPartitions The newly added partitions
520+
*/
521+
public void applyAssignment(Set<TopicPartition> assignedPartitions, SortedSet<TopicPartition> addedPartitions) {
522+
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions, addedPartitions);
523+
notifyAssignmentChange(assignedPartitions);
524+
}
525+
500526
/**
501527
* {@inheritDoc}
502528
*/

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.Metadata;
2020
import org.apache.kafka.clients.consumer.ConsumerConfig;
2121
import org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
22+
import org.apache.kafka.common.TopicPartition;
2223
import org.apache.kafka.common.Uuid;
2324
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
2425
import org.apache.kafka.common.metrics.Metrics;
@@ -32,6 +33,7 @@
3233
import java.util.Map;
3334
import java.util.SortedSet;
3435
import java.util.TreeSet;
36+
import java.util.concurrent.CompletableFuture;
3537

3638
/**
3739
* Group manager for a single consumer that has a group id defined in the config
@@ -175,6 +177,19 @@ public void onHeartbeatSuccess(ShareGroupHeartbeatResponse response) {
175177
}
176178
}
177179

180+
/**
181+
* {@inheritDoc}
182+
* <p>
183+
* For the ShareConsumer, assignment changes are applied immediately in the background thread.
184+
*/
185+
@Override
186+
protected CompletableFuture<Void> signalPartitionsAssigned(TopicIdPartitionSet assignedPartitions,
187+
SortedSet<TopicPartition> addedPartitions) {
188+
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(), addedPartitions);
189+
notifyAssignmentChange(assignedPartitions.topicPartitions());
190+
return CompletableFuture.completedFuture(null);
191+
}
192+
178193
@Override
179194
public int joinGroupEpoch() {
180195
return ShareGroupHeartbeatRequest.JOIN_GROUP_MEMBER_EPOCH;

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public enum Type {
4343
STREAMS_ON_TASKS_ASSIGNED_CALLBACK_COMPLETED,
4444
STREAMS_ON_TASKS_REVOKED_CALLBACK_COMPLETED,
4545
STREAMS_ON_ALL_TASKS_LOST_CALLBACK_COMPLETED,
46+
APPLY_ASSIGNMENT,
4647
}
4748

4849
private final Type type;

0 commit comments

Comments
 (0)