Skip to content

Commit e022849

Browse files
authored
KAFKA-20194: Update Kafka Streams DSL to use header-stores (#21580)
Implements KIP-1285, by updating the Kafka Streams DSL Processors to use the new header-state-stores in favor of the currently used ts-state-stores. Reviewers: Uladzislau Blok <blokv75@gmail.com>, Zheguang Zhao <zheguang.zhao@gmail.com>, TengYao Chi <frankvicky@apache.org>, Matthias J. Sax <matthias@confluent.io>
1 parent a40c876 commit e022849

File tree

73 files changed

+1686
-1211
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1686
-1211
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1315,7 +1315,7 @@ private <T> void shouldHandleWindowRangeDSLQueries(final Function<T, Integer> ex
13151315
assertThat(partitionResult.getFailureReason(), is(FailureReason.UNKNOWN_QUERY_TYPE));
13161316
assertThat(partitionResult.getFailureMessage(), matchesPattern(
13171317
"This store"
1318-
+ " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
1318+
+ " \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore.*\\)"
13191319
+ " doesn't know how to execute the given query"
13201320
+ " \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
13211321
+ " because WindowStores only supports WindowRangeQuery.withWindowStartRange\\."

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws Interr
521521
"Cannot get state store " + storeName + " because the queryable store type" +
522522
" [class org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreType]" +
523523
" does not accept the actual store type" +
524-
" [class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
524+
" [class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders]."
525525
)
526526
);
527527
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
3434
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3535
import org.apache.kafka.streams.state.KeyValueIterator;
36-
import org.apache.kafka.streams.state.TimestampedWindowStore;
37-
import org.apache.kafka.streams.state.ValueAndTimestamp;
36+
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
37+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3838

3939
import org.slf4j.Logger;
4040

@@ -52,7 +52,7 @@ public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
5252
protected final TimeTracker timeTracker = new TimeTracker();
5353

5454
private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
55-
protected TimestampedWindowStore<KIn, VAgg> windowStore;
55+
protected TimestampedWindowStoreWithHeaders<KIn, VAgg> windowStore;
5656
protected Sensor droppedRecordsSensor;
5757
protected Sensor emittedRecordsSensor;
5858
protected Sensor emitFinalLatencySensor;
@@ -98,7 +98,7 @@ public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) {
9898
tupleForwarder = new TimestampedTupleForwarder<>(
9999
windowStore,
100100
context,
101-
new TimestampedCacheFlushListener<>(context),
101+
new TimestampedCacheFlushListenerWithHeaders<>(context),
102102
sendOldValues);
103103
}
104104
}
@@ -201,13 +201,13 @@ private void fetchAndEmit(final Record<KIn, VIn> record,
201201
final long emitRangeUpperBound) {
202202
final long startMs = time.milliseconds();
203203

204-
try (final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit
204+
try (final KeyValueIterator<Windowed<KIn>, ValueTimestampHeaders<VAgg>> windowToEmit
205205
= windowStore.fetchAll(emitRangeLowerBound, emitRangeUpperBound)) {
206206

207207
int emittedCount = 0;
208208
while (windowToEmit.hasNext()) {
209209
emittedCount++;
210-
final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next();
210+
final KeyValue<Windowed<KIn>, ValueTimestampHeaders<VAgg>> kv = windowToEmit.next();
211211

212212
tupleForwarder.maybeForward(
213213
record.withKey(kv.key)

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.streams.kstream.internals;
1818

19+
import org.apache.kafka.common.header.internals.RecordHeaders;
1920
import org.apache.kafka.common.metrics.Sensor;
2021
import org.apache.kafka.streams.kstream.Aggregator;
2122
import org.apache.kafka.streams.kstream.Initializer;
@@ -28,7 +29,7 @@
2829
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
2930
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3031
import org.apache.kafka.streams.state.StoreBuilder;
31-
import org.apache.kafka.streams.state.ValueAndTimestamp;
32+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3233
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
3334

3435
import org.slf4j.Logger;
@@ -38,7 +39,7 @@
3839
import java.util.Set;
3940

4041
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
41-
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
42+
import static org.apache.kafka.streams.state.ValueTimestampHeaders.getValueOrNull;
4243
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
4344
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
4445

@@ -94,7 +95,7 @@ public void init(final ProcessorContext<KIn, Change<VAgg>> context) {
9495
tupleForwarder = new TimestampedTupleForwarder<>(
9596
store.store(),
9697
context,
97-
new TimestampedCacheFlushListener<>(context),
98+
new TimestampedCacheFlushListenerWithHeaders<>(context),
9899
sendOldValues);
99100
}
100101

@@ -118,7 +119,7 @@ public void process(final Record<KIn, VIn> record) {
118119
return;
119120
}
120121

121-
final ValueAndTimestamp<VAgg> oldAggAndTimestamp = store.get(record.key());
122+
final ValueTimestampHeaders<VAgg> oldAggAndTimestamp = store.get(record.key());
122123
VAgg oldAgg = getValueOrNull(oldAggAndTimestamp);
123124

124125
final VAgg newAgg;
@@ -134,7 +135,7 @@ public void process(final Record<KIn, VIn> record) {
134135

135136
newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
136137

137-
final long putReturnCode = store.put(record.key(), newAgg, newTimestamp);
138+
final long putReturnCode = store.put(record.key(), newAgg, newTimestamp, new RecordHeaders());
138139
// if not put to store, do not forward downstream either
139140
if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {
140141
tupleForwarder.maybeForward(
@@ -168,12 +169,12 @@ public void init(final ProcessorContext<?, ?> context) {
168169
}
169170

170171
@Override
171-
public ValueAndTimestamp<VAgg> get(final KIn key) {
172+
public ValueTimestampHeaders<VAgg> get(final KIn key) {
172173
return store.get(key);
173174
}
174175

175176
@Override
176-
public ValueAndTimestamp<VAgg> get(final KIn key, final long asOfTimestamp) {
177+
public ValueTimestampHeaders<VAgg> get(final KIn key, final long asOfTimestamp) {
177178
return store.get(key, asOfTimestamp);
178179
}
179180

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
264264
private static <K, V> StoreFactory joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
265265
final Serde<K> keySerde,
266266
final Serde<V> valueSerde) {
267-
return StoreBuilderWrapper.wrapStoreBuilder(Stores.windowStoreBuilder(
267+
return StoreBuilderWrapper.wrapStoreBuilder(Stores.timestampedWindowStoreWithHeadersBuilder(
268268
storeSupplier,
269269
keySerde,
270270
valueSerde

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
import org.apache.kafka.streams.processor.internals.StoreFactory;
2525
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
2626
import org.apache.kafka.streams.state.StoreBuilder;
27-
import org.apache.kafka.streams.state.WindowStore;
27+
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
28+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
2829

2930
import java.util.Collections;
3031
import java.util.Set;
@@ -49,12 +50,11 @@ public Processor<K, V, K, V> get() {
4950

5051
private class KStreamJoinWindowProcessor extends ContextualProcessor<K, V, K, V> {
5152

52-
private WindowStore<K, V> window;
53+
private TimestampedWindowStoreWithHeaders<K, V> window;
5354

5455
@Override
5556
public void init(final ProcessorContext<K, V> context) {
5657
super.init(context);
57-
5858
window = context.getStateStore(thisWindowStoreFactory.storeName());
5959
}
6060

@@ -65,7 +65,9 @@ public void process(final Record<K, V> record) {
6565
context().forward(record);
6666
if (record.key() != null) {
6767
// Every record basically starts a new window. We're using a window store mostly for the retention.
68-
window.put(record.key(), record.value(), record.timestamp());
68+
window.put(record.key(),
69+
ValueTimestampHeaders.make(record.value(), record.timestamp(), record.headers()),
70+
record.timestamp());
6971
}
7072
}
7173
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
import org.apache.kafka.streams.state.KeyValueIterator;
3434
import org.apache.kafka.streams.state.KeyValueStore;
3535
import org.apache.kafka.streams.state.StoreBuilder;
36-
import org.apache.kafka.streams.state.WindowStore;
36+
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
37+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3738
import org.apache.kafka.streams.state.WindowStoreIterator;
3839
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
3940
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
@@ -99,7 +100,7 @@ public Set<StoreBuilder<?>> stores() {
99100
}
100101

101102
protected abstract class KStreamKStreamJoinProcessor extends ContextualProcessor<K, VThis, K, VOut> {
102-
private WindowStore<K, VOther> otherWindowStore;
103+
private TimestampedWindowStoreWithHeaders<K, VOther> otherWindowStore;
103104
private Sensor droppedRecordsSensor;
104105
private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>>> outerJoinStore = Optional.empty();
105106
private InternalProcessorContext<K, VOut> internalProcessorContext;
@@ -149,9 +150,13 @@ public void process(final Record<K, VThis> record) {
149150

150151
final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs);
151152
final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs);
152-
try (final WindowStoreIterator<VOther> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
153+
try (final WindowStoreIterator<ValueTimestampHeaders<VOther>> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
153154
final boolean needOuterJoin = outer && !iter.hasNext();
154-
iter.forEachRemaining(otherRecord -> emitInnerJoin(record, otherRecord, inputRecordTimestamp));
155+
iter.forEachRemaining(otherRecord -> {
156+
// Extract value from ValueTimestampHeaders wrapper
157+
final VOther otherValue = otherRecord.value == null ? null : otherRecord.value.value();
158+
emitInnerJoin(record, new KeyValue<>(otherRecord.key, otherValue), inputRecordTimestamp);
159+
});
155160

156161
if (needOuterJoin) {
157162
// The maxStreamTime contains the max time observed in both sides of the join.

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
3030
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
3131
import org.apache.kafka.streams.state.StoreBuilder;
32-
import org.apache.kafka.streams.state.WindowStore;
32+
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
33+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3334
import org.apache.kafka.streams.state.WindowStoreIterator;
3435

3536
import org.slf4j.Logger;
@@ -76,7 +77,7 @@ public Processor<K, V1, K, VOut> get() {
7677

7778
private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
7879
private final TimeTracker timeTracker = new TimeTracker();
79-
private WindowStore<K, V2> windowStore;
80+
private TimestampedWindowStoreWithHeaders<K, V2> windowStore;
8081
private Sensor droppedRecordsSensor;
8182

8283
@Override
@@ -107,15 +108,16 @@ public void process(final Record<K, V1> record) {
107108
final boolean emitSelfRecord = inputRecordTimestamp > timeTracker.streamTime - retentionPeriod + 1;
108109

109110
// Join current record with other
110-
try (final WindowStoreIterator<V2> iter = windowStore.fetch(record.key(), timeFrom, timeTo)) {
111+
try (final WindowStoreIterator<ValueTimestampHeaders<V2>> iter = windowStore.fetch(record.key(), timeFrom, timeTo)) {
111112
while (iter.hasNext()) {
112-
final KeyValue<Long, V2> otherRecord = iter.next();
113+
final KeyValue<Long, ValueTimestampHeaders<V2>> otherRecord = iter.next();
113114
final long otherRecordTimestamp = otherRecord.key;
115+
final V2 otherValue = otherRecord.value == null ? null : otherRecord.value.value();
114116

115117
// Join this with other
116118
context().forward(
117119
record.withValue(joinerThis.apply(
118-
record.key(), record.value(), otherRecord.value))
120+
record.key(), record.value(), otherValue))
119121
.withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
120122
}
121123
}
@@ -124,11 +126,12 @@ public void process(final Record<K, V1> record) {
124126
// correct ordering means it matches the output of an inner join.
125127
timeFrom = Math.max(0L, inputRecordTimestamp - joinOtherBeforeMs);
126128
timeTo = Math.max(0L, inputRecordTimestamp + joinOtherAfterMs);
127-
try (final WindowStoreIterator<V2> iter2 = windowStore.fetch(record.key(), timeFrom, timeTo)) {
129+
try (final WindowStoreIterator<ValueTimestampHeaders<V2>> iter2 = windowStore.fetch(record.key(), timeFrom, timeTo)) {
128130
while (iter2.hasNext()) {
129-
final KeyValue<Long, V2> otherRecord = iter2.next();
131+
final KeyValue<Long, ValueTimestampHeaders<V2>> otherRecord = iter2.next();
130132
final long otherRecordTimestamp = otherRecord.key;
131133
final long maxRecordTimestamp = Math.max(inputRecordTimestamp, otherRecordTimestamp);
134+
final V2 otherValue = otherRecord.value == null ? null : otherRecord.value.value();
132135

133136
// This is needed so that output records follow timestamp order
134137
// Join this with self
@@ -140,7 +143,7 @@ public void process(final Record<K, V1> record) {
140143
// Join other with current record
141144
context().forward(
142145
record
143-
.withValue(joinerThis.apply(record.key(), (V1) otherRecord.value, (V2) record.value()))
146+
.withValue(joinerThis.apply(record.key(), (V1) otherValue, (V2) record.value()))
144147
.withTimestamp(Math.max(inputRecordTimestamp, otherRecordTimestamp)));
145148
}
146149
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
2929
import org.apache.kafka.streams.processor.internals.SerdeGetter;
3030
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
31-
import org.apache.kafka.streams.state.ValueAndTimestamp;
31+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3232
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
3333

3434
import org.slf4j.Logger;
@@ -40,7 +40,7 @@
4040
import static java.util.Objects.requireNonNull;
4141
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
4242
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
43-
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
43+
import static org.apache.kafka.streams.state.ValueTimestampHeaders.getValueOrNull;
4444

4545
class KStreamKTableJoinProcessor<StreamKey, StreamValue, TableKey, TableValue, VOut>
4646
extends ContextualProcessor<StreamKey, StreamValue, StreamKey, VOut> {
@@ -134,10 +134,10 @@ private void doJoin(final Record<StreamKey, StreamValue> record) {
134134

135135
private TableValue getTableValue(final Record<StreamKey, StreamValue> record, final TableKey mappedKey) {
136136
if (mappedKey == null) return null;
137-
final ValueAndTimestamp<TableValue> valueAndTimestamp = valueGetter.isVersioned()
137+
final ValueTimestampHeaders<TableValue> valueTimestampHeaders = valueGetter.isVersioned()
138138
? valueGetter.get(mappedKey, record.timestamp())
139139
: valueGetter.get(mappedKey);
140-
return getValueOrNull(valueAndTimestamp);
140+
return getValueOrNull(valueTimestampHeaders);
141141
}
142142

143143
private boolean maybeDropRecord(final Record<StreamKey, StreamValue> record) {

streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.streams.kstream.internals;
1818

19+
import org.apache.kafka.common.header.internals.RecordHeaders;
1920
import org.apache.kafka.common.metrics.Sensor;
2021
import org.apache.kafka.streams.kstream.Reducer;
2122
import org.apache.kafka.streams.processor.api.ContextualProcessor;
@@ -27,7 +28,7 @@
2728
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
2829
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
2930
import org.apache.kafka.streams.state.StoreBuilder;
30-
import org.apache.kafka.streams.state.ValueAndTimestamp;
31+
import org.apache.kafka.streams.state.ValueTimestampHeaders;
3132
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;
3233

3334
import org.slf4j.Logger;
@@ -37,7 +38,7 @@
3738
import java.util.Set;
3839

3940
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
40-
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
41+
import static org.apache.kafka.streams.state.ValueTimestampHeaders.getValueOrNull;
4142
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
4243
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
4344

@@ -91,7 +92,7 @@ public void init(final ProcessorContext<K, Change<V>> context) {
9192
tupleForwarder = new TimestampedTupleForwarder<>(
9293
store.store(),
9394
context,
94-
new TimestampedCacheFlushListener<>(context),
95+
new TimestampedCacheFlushListenerWithHeaders<>(context),
9596
sendOldValues);
9697
}
9798

@@ -115,7 +116,7 @@ public void process(final Record<K, V> record) {
115116
return;
116117
}
117118

118-
final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(record.key());
119+
final ValueTimestampHeaders<V> oldAggAndTimestamp = store.get(record.key());
119120
final V oldAgg = getValueOrNull(oldAggAndTimestamp);
120121

121122
final V newAgg;
@@ -129,7 +130,7 @@ public void process(final Record<K, V> record) {
129130
newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp());
130131
}
131132

132-
final long putReturnCode = store.put(record.key(), newAgg, newTimestamp);
133+
final long putReturnCode = store.put(record.key(), newAgg, newTimestamp, new RecordHeaders());
133134
// if not put to store, do not forward downstream either
134135
if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {
135136
tupleForwarder.maybeForward(
@@ -164,12 +165,12 @@ public void init(final ProcessorContext<?, ?> context) {
164165
}
165166

166167
@Override
167-
public ValueAndTimestamp<V> get(final K key) {
168+
public ValueTimestampHeaders<V> get(final K key) {
168169
return store.get(key);
169170
}
170171

171172
@Override
172-
public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
173+
public ValueTimestampHeaders<V> get(final K key, final long asOfTimestamp) {
173174
return store.get(key, asOfTimestamp);
174175
}
175176

0 commit comments

Comments
 (0)