Skip to content

Commit 2700824

Browse files
authored
KAFKA-20288 Remove orphaned groups from GroupConfigManager (#21758)
# Description `GroupConfigManager` stores the dynamic configs of groups. However, when all dynamic configs are removed the group id is not removed from the manager. This patch removes the orphan when the update group config is empty. # Test `testGroupIsRemovedWhenDynamicConfigsAreRemoved` checks whether the orphan group exists after updating using an empty config. Reviewers: Sean Quah <squah@confluent.io>, David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent e022849 commit 2700824

File tree

4 files changed

+82
-6
lines changed

4 files changed

+82
-6
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3040,12 +3040,9 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) {
30403040
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
30413041
return partitionDataStartOffset;
30423042
}
3043-
ShareGroupAutoOffsetResetStrategy offsetResetStrategy;
3044-
if (groupConfigManager.groupConfig(groupId).isPresent()) {
3045-
offsetResetStrategy = groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
3046-
} else {
3047-
offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
3048-
}
3043+
ShareGroupAutoOffsetResetStrategy offsetResetStrategy = groupConfigManager.groupConfig(groupId)
3044+
.map(GroupConfig::shareAutoOffsetReset)
3045+
.orElseGet(GroupConfig::defaultShareAutoOffsetReset);
30493046

30503047
if (offsetResetStrategy.type() == ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
30513048
return offsetForLatestTimestamp(topicIdPartition, replicaManager, leaderEpoch);

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public void updateGroupConfig(String groupId, Properties newGroupConfig) {
6464
throw new InvalidRequestException("Group name can't be empty.");
6565
}
6666

67+
if (newGroupConfig.isEmpty()) {
68+
configMap.remove(groupId);
69+
return;
70+
}
71+
6772
// Evaluate ensures configs respect broker-level bounds. For the Admin API path,
6873
// values are pre-validated so this is effectively a no-op. For the broker startup
6974
// path, configs from metadata may need evaluation if bounds have changed.

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,21 @@ public void testClampWithCustomBrokerBounds() {
115115
assertEquals(49000, configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
116116
}
117117

118+
@Test
119+
public void testGroupIsRemovedWhenDynamicConfigsAreRemoved() {
120+
String groupId1 = "foo";
121+
String groupId2 = "bar";
122+
Properties props = new Properties();
123+
props.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 50000);
124+
configManager.updateGroupConfig(groupId1, props);
125+
configManager.updateGroupConfig(groupId2, props);
126+
assertTrue(configManager.groupIds().contains(groupId1));
127+
128+
configManager.updateGroupConfig(groupId1, new Properties());
129+
assertFalse(configManager.groupIds().contains(groupId1));
130+
assertTrue(configManager.groupIds().contains(groupId2));
131+
}
132+
118133
public static GroupConfigManager createConfigManager() {
119134
return createConfigManager(new HashMap<>());
120135
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.config;
18+
19+
import org.apache.kafka.clients.admin.Admin;
20+
import org.apache.kafka.clients.admin.AlterConfigOp;
21+
import org.apache.kafka.clients.admin.ConfigEntry;
22+
import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
23+
import org.apache.kafka.common.config.ConfigResource;
24+
import org.apache.kafka.common.test.ClusterInstance;
25+
import org.apache.kafka.common.test.api.ClusterTest;
26+
import org.apache.kafka.test.TestUtils;
27+
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Set;
31+
import java.util.concurrent.ExecutionException;
32+
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
35+
public class DynamicConfigTest {
36+
@ClusterTest
37+
public void testGroupIsRemovedWhenDynamicConfigsAreRemoved(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
38+
try (Admin admin = clusterInstance.admin()) {
39+
var cr = new ConfigResource(ConfigResource.Type.GROUP, "gp");
40+
assertEquals(List.of(), admin.listConfigResources(Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions()).all().get());
41+
42+
// add dynamic config
43+
admin.incrementalAlterConfigs(Map.of(cr, List.of(new AlterConfigOp(
44+
new ConfigEntry("consumer.session.timeout.ms", "45001"), AlterConfigOp.OpType.SET))))
45+
.all()
46+
.get();
47+
TestUtils.waitForCondition(() -> !admin.listConfigResources(Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions()).all().get().isEmpty(),
48+
"Should include a group with dynamic config");
49+
50+
// remove dynamic config
51+
admin.incrementalAlterConfigs(Map.of(cr, List.of(new AlterConfigOp(
52+
new ConfigEntry("consumer.session.timeout.ms", null), AlterConfigOp.OpType.DELETE))))
53+
.all()
54+
.get();
55+
TestUtils.waitForCondition(() -> admin.listConfigResources(Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions()).all().get().isEmpty(),
56+
"Should not include any group");
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)