@@ -299,6 +299,23 @@ private static UpdateTargetAssignmentResult<Assignment> fromLastTargetAssignment
299299 group .targetAssignment (member .memberId ())
300300 );
301301 }
302+
303+ private static UpdateTargetAssignmentResult <TasksTuple > fromLastTargetAssignment (
304+ StreamsGroup group ,
305+ Optional <StreamsGroupMember > member
306+ ) {
307+ if (member .isPresent ()) {
308+ return new UpdateTargetAssignmentResult <>(
309+ group .assignmentEpoch (),
310+ group .targetAssignment (member .get ().memberId ())
311+ );
312+ } else {
313+ return new UpdateTargetAssignmentResult <>(
314+ group .assignmentEpoch (),
315+ TasksTuple .EMPTY
316+ );
317+ }
318+ }
302319 }
303320
304321 public static class Builder {
@@ -2086,36 +2103,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
20862103 // 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
20872104 // replaces an existing static member.
20882105 // The delta between the existing and the new target assignment is persisted to the partition.
2089- int targetAssignmentEpoch ;
2090- TasksTuple targetAssignment ;
2091- if (groupEpoch > group .assignmentEpoch ()) {
2092- boolean initialDelayActive = timer .isScheduled (streamsInitialRebalanceKey (groupId ));
2093- if (initialDelayActive ) {
2094- // During initial rebalance delay, return empty assignment to first joining members.
2095- targetAssignmentEpoch = Math .max (1 , group .assignmentEpoch ());
2096- targetAssignment = TasksTuple .EMPTY ;
2097-
2098- returnedStatus .add (
2099- new Status ()
2100- .setStatusCode (StreamsGroupHeartbeatResponse .Status .ASSIGNMENT_DELAYED .code ())
2101- .setStatusDetail ("Assignment delayed due to the configured initial rebalance delay." )
2102- );
2103- } else {
2104- targetAssignment = updateStreamsTargetAssignment (
2105- group ,
2106- groupEpoch ,
2107- Optional .of (updatedMember ),
2108- updatedConfiguredTopology ,
2109- metadataImage ,
2110- records ,
2111- currentAssignmentConfigs
2112- );
2113- targetAssignmentEpoch = groupEpoch ;
2114- }
2115- } else {
2116- targetAssignmentEpoch = group .assignmentEpoch ();
2117- targetAssignment = group .targetAssignment (updatedMember .memberId ());
2118- }
2106+ UpdateTargetAssignmentResult <TasksTuple > updateTargetAssignmentResult = maybeUpdateStreamsTargetAssignment (
2107+ group ,
2108+ groupEpoch ,
2109+ Optional .of (updatedMember ),
2110+ updatedConfiguredTopology ,
2111+ metadataImage ,
2112+ records ,
2113+ Optional .of (returnedStatus ),
2114+ currentAssignmentConfigs
2115+ );
21192116
21202117 // 5. Reconcile the member's assignment with the target assignment if the member is not
21212118 // fully reconciled yet.
@@ -2125,8 +2122,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
21252122 group ::currentActiveTaskProcessId ,
21262123 group ::currentStandbyTaskProcessIds ,
21272124 group ::currentWarmupTaskProcessIds ,
2128- targetAssignmentEpoch ,
2129- targetAssignment ,
2125+ updateTargetAssignmentResult . targetAssignmentEpoch () ,
2126+ updateTargetAssignmentResult . targetAssignment () ,
21302127 ownedActiveTasks ,
21312128 ownedStandbyTasks ,
21322129 ownedWarmupTasks ,
@@ -3940,17 +3937,38 @@ private UpdateTargetAssignmentResult<Assignment> maybeUpdateTargetAssignment(
39403937 * @param updatedMember The updated member (optional).
39413938 * @param metadataImage The metadata image.
39423939 * @param records The list to accumulate any new records.
3940+ * @param returnedStatus A mutable collection of status to be returned in the response.
39433941 * @return The new target assignment for the updated member, or EMPTY if no member specified.
39443942 */
3945- private TasksTuple updateStreamsTargetAssignment (
3943+ private UpdateTargetAssignmentResult < TasksTuple > maybeUpdateStreamsTargetAssignment (
39463944 StreamsGroup group ,
39473945 int groupEpoch ,
39483946 Optional <StreamsGroupMember > updatedMember ,
39493947 ConfiguredTopology configuredTopology ,
39503948 CoordinatorMetadataImage metadataImage ,
39513949 List <CoordinatorRecord > records ,
3950+ Optional <List <Status >> returnedStatus ,
39523951 Map <String , String > assignmentConfigs
39533952 ) {
3953+ boolean initialDelayActive = timer .isScheduled (streamsInitialRebalanceKey (group .groupId ()));
3954+ if (initialDelayActive ) {
3955+ returnedStatus .ifPresent (statusList -> statusList .add (
3956+ new Status ()
3957+ .setStatusCode (StreamsGroupHeartbeatResponse .Status .ASSIGNMENT_DELAYED .code ())
3958+ .setStatusDetail ("Assignment delayed due to the configured initial rebalance delay." )
3959+ ));
3960+
3961+ return new UpdateTargetAssignmentResult <>(
3962+ group .assignmentEpoch (),
3963+ TasksTuple .EMPTY
3964+ );
3965+ }
3966+
3967+ if (group .assignmentEpoch () >= groupEpoch ) {
3968+ // The assignment is up to date.
3969+ return UpdateTargetAssignmentResult .fromLastTargetAssignment (group , updatedMember );
3970+ }
3971+
39543972 TaskAssignor assignor = streamsGroupAssignor (group .groupId ());
39553973 try {
39563974 org .apache .kafka .coordinator .group .streams .TargetAssignmentBuilder assignmentResultBuilder =
@@ -3986,8 +4004,11 @@ private TasksTuple updateStreamsTargetAssignment(
39864004
39874005 records .addAll (assignmentResult .records ());
39884006
3989- return updatedMember .map (member -> assignmentResult .targetAssignment ().get (member .memberId ()))
3990- .orElse (TasksTuple .EMPTY );
4007+ return new UpdateTargetAssignmentResult <>(
4008+ groupEpoch ,
4009+ updatedMember .map (member -> assignmentResult .targetAssignment ().get (member .memberId ()))
4010+ .orElse (TasksTuple .EMPTY )
4011+ );
39914012 } catch (TaskAssignorException ex ) {
39924013 String msg = String .format ("Failed to compute a new target assignment for epoch %d: %s" ,
39934014 groupEpoch , ex .getMessage ());
@@ -4024,13 +4045,14 @@ private CoordinatorResult<Void, CoordinatorRecord> computeDelayedTargetAssignmen
40244045 }
40254046
40264047 List <CoordinatorRecord > records = new ArrayList <>();
4027- updateStreamsTargetAssignment (
4048+ maybeUpdateStreamsTargetAssignment (
40284049 group ,
40294050 group .groupEpoch (),
40304051 Optional .empty (),
40314052 group .configuredTopology ().get (),
40324053 metadataImage ,
40334054 records ,
4055+ Optional .empty (),
40344056 group .lastAssignmentConfigs ()
40354057 );
40364058
0 commit comments