Skip to content

Commit ef14f76

Browse files
authored
KAFKA-6629: parameterise SegmentedCacheFunctionTest for session key schemas (#19404)
Addresses: [KAFKA-6629](https://issues.apache.org/jira/browse/KAFKA-6629) Adds configuration for the SessionKeySchema and parameterises the existing tests so that both WindowKeys and SessionKeys are tested under the existing unit tests. Reviewers: Bill Bejeck <bbejeck@apache.org> --------- Co-authored-by: Lorcan <lorcanjames1@gmail.com>
1 parent 13b5627 commit ef14f76

1 file changed

Lines changed: 92 additions & 39 deletions

File tree

streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java

Lines changed: 92 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,97 +19,151 @@
1919

2020
import org.apache.kafka.common.utils.Bytes;
2121

22-
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.Arguments;
24+
import org.junit.jupiter.params.provider.MethodSource;
2325

2426
import java.nio.ByteBuffer;
27+
import java.util.stream.Stream;
2528

29+
import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
2630
import static org.hamcrest.MatcherAssert.assertThat;
2731
import static org.hamcrest.core.IsEqual.equalTo;
2832

29-
// TODO: this test coverage does not consider session serde yet
30-
public class SegmentedCacheFunctionTest {
33+
class SegmentedCacheFunctionTest {
3134

3235
private static final int SEGMENT_INTERVAL = 17;
33-
private static final int TIMESTAMP = 736213517;
36+
private static final int START_TIMESTAMP = 736213517;
37+
private static final int END_TIMESTAMP = 800000000;
3438

35-
private static final Bytes THE_KEY = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42);
36-
private static final Bytes THE_CACHE_KEY = Bytes.wrap(
37-
ByteBuffer.allocate(8 + THE_KEY.get().length)
38-
.putLong(TIMESTAMP / SEGMENT_INTERVAL)
39-
.put(THE_KEY.get()).array()
39+
private static final Bytes THE_WINDOW_KEY = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, START_TIMESTAMP, 42);
40+
private static final Bytes THE_SESSION_KEY = toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, END_TIMESTAMP, START_TIMESTAMP);
41+
42+
private static final Bytes THE_WINDOW_CACHE_KEY = Bytes.wrap(
43+
ByteBuffer.allocate(8 + THE_WINDOW_KEY.get().length)
44+
.putLong(START_TIMESTAMP / SEGMENT_INTERVAL)
45+
.put(THE_WINDOW_KEY.get()).array()
4046
);
4147

42-
private final SegmentedCacheFunction cacheFunction = new SegmentedCacheFunction(new WindowKeySchema(), SEGMENT_INTERVAL);
48+
private static final Bytes THE_SESSION_CACHE_KEY = Bytes.wrap(
49+
ByteBuffer.allocate(8 + THE_SESSION_KEY.get().length)
50+
.putLong(END_TIMESTAMP / SEGMENT_INTERVAL)
51+
.put(THE_SESSION_KEY.get()).array()
52+
);
53+
54+
private SegmentedCacheFunction createCacheFunction(final SegmentedBytesStore.KeySchema keySchema) {
55+
return new SegmentedCacheFunction(keySchema, SEGMENT_INTERVAL);
56+
}
4357

44-
@Test
45-
public void key() {
46-
assertThat(
47-
cacheFunction.key(THE_CACHE_KEY),
48-
equalTo(THE_KEY)
58+
private static Stream<Arguments> provideKeysAndSchemas() {
59+
return Stream.of(
60+
Arguments.of(THE_WINDOW_CACHE_KEY, THE_WINDOW_KEY, new WindowKeySchema()),
61+
Arguments.of(THE_SESSION_CACHE_KEY, THE_SESSION_KEY, new SessionKeySchema())
62+
);
63+
}
64+
65+
private static Stream<Arguments> provideKeysTimestampsAndSchemas() {
66+
return Stream.of(
67+
Arguments.of(THE_WINDOW_KEY, START_TIMESTAMP, new WindowKeySchema()),
68+
Arguments.of(THE_SESSION_KEY, END_TIMESTAMP, new SessionKeySchema())
69+
);
70+
}
71+
72+
private static Stream<Arguments> provideKeysForBoundaryChecks() {
73+
final Bytes sameKeyInPriorSegmentWindow = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
74+
final Bytes sameKeyInPriorSegmentSession = toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 12345);
75+
76+
final Bytes lowerKeyInSameSegmentWindow = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, START_TIMESTAMP - 1, 0);
77+
final Bytes lowerKeyInSameSegmentSession = toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, END_TIMESTAMP - 1, START_TIMESTAMP + 1);
78+
79+
return Stream.of(
80+
Arguments.of(THE_WINDOW_KEY, new WindowKeySchema(), sameKeyInPriorSegmentWindow, lowerKeyInSameSegmentWindow),
81+
Arguments.of(THE_SESSION_KEY, new SessionKeySchema(), sameKeyInPriorSegmentSession, lowerKeyInSameSegmentSession)
4982
);
5083
}
5184

52-
@Test
53-
public void cacheKey() {
54-
final long segmentId = TIMESTAMP / SEGMENT_INTERVAL;
85+
static Bytes toStoreKeyBinary(final byte[] serializedKey,
86+
final long endTime,
87+
final long startTime) {
88+
final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + TIMESTAMP_SIZE);
89+
buf.put(serializedKey);
90+
buf.putLong(endTime);
91+
buf.putLong(startTime);
5592

56-
final Bytes actualCacheKey = cacheFunction.cacheKey(THE_KEY);
93+
return Bytes.wrap(buf.array());
94+
}
95+
96+
@ParameterizedTest
97+
@MethodSource("provideKeysAndSchemas")
98+
void testKey(final Bytes cacheKey, final Bytes key, final SegmentedBytesStore.KeySchema keySchema) {
99+
assertThat(
100+
createCacheFunction(keySchema).key(cacheKey),
101+
equalTo(key)
102+
);
103+
}
104+
105+
@ParameterizedTest
106+
@MethodSource("provideKeysTimestampsAndSchemas")
107+
void cacheKey(final Bytes key, final int timeStamp, final SegmentedBytesStore.KeySchema keySchema) {
108+
final long segmentId = timeStamp / SEGMENT_INTERVAL;
109+
final Bytes actualCacheKey = createCacheFunction(keySchema).cacheKey(key);
57110
final ByteBuffer buffer = ByteBuffer.wrap(actualCacheKey.get());
58111

59112
assertThat(buffer.getLong(), equalTo(segmentId));
60113

61114
final byte[] actualKey = new byte[buffer.remaining()];
62115
buffer.get(actualKey);
63-
assertThat(Bytes.wrap(actualKey), equalTo(THE_KEY));
116+
assertThat(Bytes.wrap(actualKey), equalTo(key));
64117
}
65118

66-
@Test
67-
public void testRoundTripping() {
119+
@ParameterizedTest
120+
@MethodSource("provideKeysAndSchemas")
121+
void testRoundTripping(final Bytes cacheKey, final Bytes key, final SegmentedBytesStore.KeySchema keySchema) {
122+
final SegmentedCacheFunction cacheFunction = createCacheFunction(keySchema);
123+
68124
assertThat(
69-
cacheFunction.key(cacheFunction.cacheKey(THE_KEY)),
70-
equalTo(THE_KEY)
125+
cacheFunction.key(cacheFunction.cacheKey(key)),
126+
equalTo(key)
71127
);
72128

73129
assertThat(
74-
cacheFunction.cacheKey(cacheFunction.key(THE_CACHE_KEY)),
75-
equalTo(THE_CACHE_KEY)
130+
cacheFunction.cacheKey(cacheFunction.key(cacheKey)),
131+
equalTo(cacheKey)
76132
);
77133
}
78134

79-
@Test
80-
public void compareSegmentedKeys() {
135+
@ParameterizedTest
136+
@MethodSource("provideKeysForBoundaryChecks")
137+
void compareSegmentedKeys(final Bytes key, final SegmentedBytesStore.KeySchema keySchema, final Bytes sameKeyInPriorSegment, final Bytes lowerKeyInSameSegment) {
138+
final SegmentedCacheFunction cacheFunction = createCacheFunction(keySchema);
81139
assertThat(
82140
"same key in same segment should be ranked the same",
83141
cacheFunction.compareSegmentedKeys(
84-
cacheFunction.cacheKey(THE_KEY),
85-
THE_KEY
142+
cacheFunction.cacheKey(key),
143+
key
86144
) == 0
87145
);
88146

89-
final Bytes sameKeyInPriorSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
90-
91147
assertThat(
92148
"same keys in different segments should be ordered according to segment",
93149
cacheFunction.compareSegmentedKeys(
94150
cacheFunction.cacheKey(sameKeyInPriorSegment),
95-
THE_KEY
151+
key
96152
) < 0
97153
);
98154

99155
assertThat(
100156
"same keys in different segments should be ordered according to segment",
101157
cacheFunction.compareSegmentedKeys(
102-
cacheFunction.cacheKey(THE_KEY),
158+
cacheFunction.cacheKey(key),
103159
sameKeyInPriorSegment
104160
) > 0
105161
);
106162

107-
final Bytes lowerKeyInSameSegment = WindowKeySchema.toStoreKeyBinary(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0);
108-
109163
assertThat(
110164
"different keys in same segments should be ordered according to key",
111165
cacheFunction.compareSegmentedKeys(
112-
cacheFunction.cacheKey(THE_KEY),
166+
cacheFunction.cacheKey(key),
113167
lowerKeyInSameSegment
114168
) > 0
115169
);
@@ -118,9 +172,8 @@ public void compareSegmentedKeys() {
118172
"different keys in same segments should be ordered according to key",
119173
cacheFunction.compareSegmentedKeys(
120174
cacheFunction.cacheKey(lowerKeyInSameSegment),
121-
THE_KEY
175+
key
122176
) < 0
123177
);
124178
}
125-
126179
}

0 commit comments

Comments
 (0)