Skip to content

Commit 4ebf018

Browse files
authored
KAFKA-18608: Add Support for OAuth Client Assertion to client_credentials Grant Type (#21483)
## What Implements [KIP-1258](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1258%3A+Add+Support+for+OAuth+Client+Assertion+to+client_credentials+Grant+Type): Add support for OAuth 2.0 client assertion authentication (RFC 7523§2.2) as a more secure alternative to client secrets. ## Key Changes ### Core Implementation - **New**: `ClientAssertionRequestFormatter` - Formats HTTP requests with client assertion parameters - **Enhanced**: `ClientCredentialsRequestFormatterFactory` - Three-tier fallback mechanism with logging - **Renamed**: `ClientCredentialsRequestFormatter` →`ClientSecretRequestFormatter` (internal class) ### Three-Tier Fallback 1. File-based assertion (`sasl.oauthbearer.assertion.file`) 2. Dynamically-generated assertion (`sasl.oauthbearer.assertion.claim.iss` + private key) 3. Client secret (backward compatible fallback) ### Infrastructure - Reuses KIP-1139 assertion creation/signing/caching - No new configuration properties required - Supports RS256 and ES256 algorithms - Automatic private key file reloading ## Testing - ✅ RFC 7523 compliance verified - ✅ Backward compatibility validated ## Compatibility - ✅ 100% backward compatible - ✅ No public API changes - ✅ No broker changes required - ✅ Client-side only implementation ## Configuration Example ```properties # Client Assertion (Recommended) sasl.oauthbearer.token.endpoint.url=https://idp.com/oauth/token sasl.oauthbearer.assertion.private.key.file=/path/to/key.pem sasl.oauthbearer.assertion.algorithm=RS256 sasl.oauthbearer.assertion.claim.iss=kafka-client sasl.oauthbearer.assertion.claim.sub=service-account sasl.oauthbearer.assertion.claim.aud=https://idp.com # Client Secret (Still Works) sasl.oauthbearer.client.credentials.client.id=my-client sasl.oauthbearer.client.credentials.client.secret=my-secret ``` ## References - **JIRA**: [KAFKA-18608](https://issues.apache.org/jira/browse/KAFKA-18608) - **KIP**: [KIP-1258](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1258) - **RFC 7521**: [Assertion Framework for OAuth2.0](https://datatracker.ietf.org/doc/html/rfc7521) - **RFC 7523**: [JWT Profile for OAuth 2.0 Client Authentication](https://datatracker.ietf.org/doc/html/rfc7523) - **Related**: [KIP-1139 (jwt-bearer grant)](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1139) Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Kirk True <kirk@kirktrue.pro>
1 parent 5729bb6 commit 4ebf018

26 files changed

+3347
-209
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1900,6 +1900,8 @@ project(':clients') {
19001900
testImplementation libs.mockitoCore
19011901
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
19021902
testImplementation testLog4j2Libs
1903+
testImplementation libs.testcontainersKeycloak
1904+
testImplementation libs.testcontainersJunitJupiter
19031905

19041906
testCompileOnly libs.bndlib
19051907

checkstyle/import-control.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@
137137
<allow pkg="com.fasterxml.jackson.databind" />
138138
<allow pkg="org.jose4j" />
139139
<allow pkg="javax.crypto"/>
140+
<allow pkg="org.testcontainers" />
141+
<allow pkg="org.keycloak" />
142+
<allow pkg="dasniko.testcontainers" />
140143
</subpackage>
141144
</subpackage>
142145

clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java

Lines changed: 14 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package org.apache.kafka.common.security.oauthbearer;
1919

20-
import org.apache.kafka.common.config.ConfigException;
21-
import org.apache.kafka.common.config.SaslConfigs;
22-
import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientCredentialsRequestFormatter;
20+
import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientCredentialsRequestFormatterFactory;
2321
import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
2422
import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpJwtRetriever;
2523
import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpRequestFormatter;
2624
import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils;
25+
import org.apache.kafka.common.utils.Time;
2726
import org.apache.kafka.common.utils.Utils;
2827

2928
import org.slf4j.Logger;
@@ -32,21 +31,9 @@
3231
import java.io.IOException;
3332
import java.util.List;
3433
import java.util.Map;
35-
import java.util.Objects;
36-
import java.util.function.Function;
3734

3835
import javax.security.auth.login.AppConfigurationEntry;
3936

40-
import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
41-
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
42-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID;
43-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET;
44-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
45-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE;
46-
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
47-
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
48-
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
49-
5037
/**
5138
* {@code ClientCredentialsJwtRetriever} is a {@link JwtRetriever} that performs the steps to request
5239
* a JWT from an OAuth/OIDC identity provider using the <code>client_credentials</code> grant type. This
@@ -109,27 +96,26 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever {
10996

11097
private static final Logger LOG = LoggerFactory.getLogger(ClientCredentialsJwtRetriever.class);
11198

99+
private final Time time;
112100
private HttpJwtRetriever delegate;
113101

102+
public ClientCredentialsJwtRetriever() {
103+
this(Time.SYSTEM);
104+
}
105+
106+
public ClientCredentialsJwtRetriever(Time time) {
107+
this.time = time;
108+
}
109+
114110
@Override
115111
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
116112
ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
117113
JaasOptionsUtils jou = new JaasOptionsUtils(saslMechanism, jaasConfigEntries);
118114

119-
ConfigOrJaas configOrJaas = new ConfigOrJaas(cu, jou);
120-
String clientId = configOrJaas.clientId();
121-
String clientSecret = configOrJaas.clientSecret();
122-
String scope = configOrJaas.scope();
123-
boolean urlencodeHeader = validateUrlencodeHeader(cu);
124-
125-
HttpRequestFormatter requestFormatter = new ClientCredentialsRequestFormatter(
126-
clientId,
127-
clientSecret,
128-
scope,
129-
urlencodeHeader
130-
);
131-
115+
HttpRequestFormatter requestFormatter = ClientCredentialsRequestFormatterFactory.create(cu, jou, time);
132116
delegate = new HttpJwtRetriever(requestFormatter);
117+
118+
LOG.debug("Created instance of {} as delegate", delegate.getClass().getName());
133119
delegate.configure(configs, saslMechanism, jaasConfigEntries);
134120
}
135121

@@ -145,108 +131,4 @@ public String retrieve() throws JwtRetrieverException {
145131
public void close() throws IOException {
146132
Utils.closeQuietly(delegate, "JWT retriever delegate");
147133
}
148-
149-
/**
150-
* In some cases, the incoming {@link Map} doesn't contain a value for
151-
* {@link SaslConfigs#SASL_OAUTHBEARER_HEADER_URLENCODE}. Returning {@code null} from {@link Map#get(Object)}
152-
* will cause a {@link NullPointerException} when it is later unboxed.
153-
*
154-
* <p/>
155-
*
156-
* This utility method ensures that we have a non-{@code null} value to use in the
157-
* {@link HttpJwtRetriever} constructor.
158-
*/
159-
static boolean validateUrlencodeHeader(ConfigurationUtils configurationUtils) {
160-
Boolean urlencodeHeader = configurationUtils.get(SASL_OAUTHBEARER_HEADER_URLENCODE);
161-
return Objects.requireNonNullElse(urlencodeHeader, DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE);
162-
}
163-
164-
/**
165-
* Retrieves the values first from configuration, then falls back to JAAS, and, if required, throws an error.
166-
*/
167-
private static class ConfigOrJaas {
168-
169-
private final ConfigurationUtils cu;
170-
private final JaasOptionsUtils jou;
171-
172-
private ConfigOrJaas(ConfigurationUtils cu, JaasOptionsUtils jou) {
173-
this.cu = cu;
174-
this.jou = jou;
175-
}
176-
177-
private String clientId() {
178-
return getValue(
179-
SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
180-
CLIENT_ID_CONFIG,
181-
true,
182-
cu::validateString,
183-
jou::validateString
184-
);
185-
}
186-
187-
private String clientSecret() {
188-
return getValue(
189-
SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
190-
CLIENT_SECRET_CONFIG,
191-
true,
192-
cu::validatePassword,
193-
jou::validateString
194-
);
195-
}
196-
197-
private String scope() {
198-
return getValue(
199-
SASL_OAUTHBEARER_SCOPE,
200-
SCOPE_CONFIG,
201-
false,
202-
cu::validateString,
203-
jou::validateString
204-
);
205-
}
206-
207-
private String getValue(String configName,
208-
String jaasName,
209-
boolean isRequired,
210-
Function<String, String> configValueGetter,
211-
Function<String, String> jaasValueGetter) {
212-
boolean isPresentInConfig = cu.containsKey(configName);
213-
boolean isPresentInJaas = jou.containsKey(jaasName);
214-
215-
if (isPresentInConfig) {
216-
if (isPresentInJaas) {
217-
// Log if the user is using the deprecated JAAS option.
218-
LOG.warn(
219-
"Both the OAuth configuration {} as well as the JAAS option {} (from the {} configuration) were provided. " +
220-
"Since the {} JAAS option is deprecated, it will be ignored and the value from the {} configuration will be used. " +
221-
"Please update your configuration to only use {}.",
222-
configName,
223-
jaasName,
224-
SASL_JAAS_CONFIG,
225-
jaasName,
226-
configName,
227-
configName
228-
);
229-
}
230-
231-
return configValueGetter.apply(configName);
232-
} else if (isPresentInJaas) {
233-
String value = jaasValueGetter.apply(jaasName);
234-
235-
// Log if the user is using the deprecated JAAS option.
236-
LOG.warn(
237-
"The OAuth JAAS option {} was configured in {}, but that JAAS option is deprecated and will be removed. " +
238-
"Please update your configuration to use the {} configuration instead.",
239-
jaasName,
240-
SASL_JAAS_CONFIG,
241-
configName
242-
);
243-
244-
return value;
245-
} else if (isRequired) {
246-
throw new ConfigException(configName, null);
247-
} else {
248-
return null;
249-
}
250-
}
251-
}
252134
}

clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtRetriever.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.kafka.common.security.oauthbearer;
1919

20-
import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientCredentialsRequestFormatter;
20+
import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientAssertionRequestFormatter;
21+
import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientSecretRequestFormatter;
2122
import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
2223
import org.apache.kafka.common.utils.Utils;
2324

@@ -43,8 +44,8 @@
4344
* If the value of <code>sasl.oauthbearer.token.endpoint.url</code> is set to a value that starts with the
4445
* <code>file</code> protocol (e.g. <code>file:/tmp/path/to/a/static-jwt.json</code>), an instance of
4546
* {@link FileJwtRetriever} will be used as the underlying {@link JwtRetriever}. Otherwise, the URL is
46-
* assumed to be an HTTP/HTTPS-based URL, and an instance of {@link ClientCredentialsRequestFormatter} will
47-
* be created and used.
47+
* assumed to be an HTTP/HTTPS-based URL, and an instance of {@link ClientAssertionRequestFormatter} or
48+
* {@link ClientSecretRequestFormatter} will be created and used.
4849
* </li>
4950
* </ul>
5051
*

clients/src/main/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetriever.java

Lines changed: 11 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,21 @@
2121
import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpJwtRetriever;
2222
import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpRequestFormatter;
2323
import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtBearerRequestFormatter;
24-
import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionCreator;
25-
import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionJwtTemplate;
26-
import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.DefaultAssertionCreator;
27-
import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.FileAssertionCreator;
28-
import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.StaticAssertionJwtTemplate;
24+
import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionSupplierFactory;
25+
import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.CloseableSupplier;
2926
import org.apache.kafka.common.utils.Time;
3027
import org.apache.kafka.common.utils.Utils;
3128

32-
import java.io.File;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
3332
import java.io.IOException;
3433
import java.util.List;
3534
import java.util.Map;
36-
import java.util.Optional;
37-
import java.util.function.Supplier;
3835

3936
import javax.security.auth.login.AppConfigurationEntry;
4037

41-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_ALGORITHM;
42-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE;
43-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE;
44-
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE;
4538
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE;
46-
import static org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionUtils.layeredAssertionJwtTemplate;
4739

4840
/**
4941
* {@code JwtBearerJwtRetriever} is a {@link JwtRetriever} that performs the steps to request
@@ -116,10 +108,11 @@
116108
*/
117109
public class JwtBearerJwtRetriever implements JwtRetriever {
118110

111+
private static final Logger LOG = LoggerFactory.getLogger(JwtBearerJwtRetriever.class);
112+
119113
private final Time time;
120114
private HttpJwtRetriever delegate;
121-
private AssertionJwtTemplate assertionJwtTemplate;
122-
private AssertionCreator assertionCreator;
115+
private CloseableSupplier<String> assertionSupplier;
123116

124117
public JwtBearerJwtRetriever() {
125118
this(Time.SYSTEM);
@@ -135,32 +128,12 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf
135128

136129
String scope = cu.validateString(SASL_OAUTHBEARER_SCOPE, false);
137130

138-
if (cu.validateString(SASL_OAUTHBEARER_ASSERTION_FILE, false) != null) {
139-
File assertionFile = cu.validateFile(SASL_OAUTHBEARER_ASSERTION_FILE);
140-
assertionCreator = new FileAssertionCreator(assertionFile);
141-
assertionJwtTemplate = new StaticAssertionJwtTemplate();
142-
} else {
143-
String algorithm = cu.validateString(SASL_OAUTHBEARER_ASSERTION_ALGORITHM);
144-
File privateKeyFile = cu.validateFile(SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE);
145-
Optional<String> passphrase = cu.containsKey(SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE) ?
146-
Optional.of(cu.validatePassword(SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE)) :
147-
Optional.empty();
148-
149-
assertionCreator = new DefaultAssertionCreator(algorithm, privateKeyFile, passphrase);
150-
assertionJwtTemplate = layeredAssertionJwtTemplate(cu, time);
151-
}
152-
153-
Supplier<String> assertionSupplier = () -> {
154-
try {
155-
return assertionCreator.create(assertionJwtTemplate);
156-
} catch (Exception e) {
157-
throw new JwtRetrieverException(e);
158-
}
159-
};
131+
assertionSupplier = AssertionSupplierFactory.create(cu, time);
160132

161133
HttpRequestFormatter requestFormatter = new JwtBearerRequestFormatter(scope, assertionSupplier);
162134

163135
delegate = new HttpJwtRetriever(requestFormatter);
136+
LOG.debug("Created instance of {} as delegate", delegate.getClass().getName());
164137
delegate.configure(configs, saslMechanism, jaasConfigEntries);
165138
}
166139

@@ -174,8 +147,7 @@ public String retrieve() throws JwtRetrieverException {
174147

175148
@Override
176149
public void close() throws IOException {
177-
Utils.closeQuietly(assertionCreator, "JWT assertion creator");
178-
Utils.closeQuietly(assertionJwtTemplate, "JWT assertion template");
150+
Utils.closeQuietly(assertionSupplier, "JWT assertion supplier");
179151
Utils.closeQuietly(delegate, "JWT retriever delegate");
180152
}
181153
}

0 commit comments

Comments
 (0)