Skip to content

Commit 18c6d6f

Browse files
committed
add complete txn tests to KafkaProducer
1 parent 488feeb commit 18c6d6f

1 file changed

Lines changed: 77 additions & 0 deletions

File tree

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
3333
import org.apache.kafka.clients.producer.internals.Sender;
3434
import org.apache.kafka.clients.producer.internals.TransactionManager;
35+
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
3536
import org.apache.kafka.common.Cluster;
3637
import org.apache.kafka.common.KafkaException;
3738
import org.apache.kafka.common.Metric;
@@ -1601,6 +1602,82 @@ public void testPrepareTransactionFailsWhen2PCDisabled() {
16011602
}
16021603
}
16031604

1605+
@Test
1606+
public void testCompleteTransactionWithMatchingState() throws Exception {
1607+
StringSerializer serializer = new StringSerializer();
1608+
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
1609+
1610+
when(ctx.transactionManager.isPrepared()).thenReturn(true);
1611+
when(ctx.sender.isRunning()).thenReturn(true);
1612+
1613+
// Create prepared states with matching values
1614+
long producerId = 12345L;
1615+
short epoch = 5;
1616+
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
1617+
PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);
1618+
1619+
// Set up the transaction manager to return the prepared state
1620+
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
1621+
1622+
// Should trigger commit when states match
1623+
TransactionalRequestResult commitResult = mock(TransactionalRequestResult.class);
1624+
when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);
1625+
1626+
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
1627+
// Call completeTransaction with the matching state
1628+
producer.completeTransaction(inputState);
1629+
1630+
// Verify methods called in order
1631+
verify(ctx.transactionManager).isPrepared();
1632+
verify(ctx.transactionManager).preparedTransactionState();
1633+
verify(ctx.transactionManager).beginCommit();
1634+
1635+
// Verify abort was never called
1636+
verify(ctx.transactionManager, never()).beginAbort();
1637+
1638+
// Verify sender was woken up
1639+
verify(ctx.sender).wakeup();
1640+
}
1641+
}
1642+
1643+
@Test
1644+
public void testCompleteTransactionWithNonMatchingState() throws Exception {
1645+
StringSerializer serializer = new StringSerializer();
1646+
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
1647+
1648+
when(ctx.transactionManager.isPrepared()).thenReturn(true);
1649+
when(ctx.sender.isRunning()).thenReturn(true);
1650+
1651+
// Create txn prepared states with different values
1652+
long producerId = 12345L;
1653+
short epoch = 5;
1654+
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
1655+
PreparedTxnState inputState = new PreparedTxnState(producerId + 1, epoch);
1656+
1657+
// Set up the transaction manager to return the prepared state
1658+
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
1659+
1660+
// Should trigger abort when states don't match
1661+
TransactionalRequestResult abortResult = mock(TransactionalRequestResult.class);
1662+
when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);
1663+
1664+
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
1665+
// Call completeTransaction with the non-matching state
1666+
producer.completeTransaction(inputState);
1667+
1668+
// Verify methods called in order
1669+
verify(ctx.transactionManager).isPrepared();
1670+
verify(ctx.transactionManager).preparedTransactionState();
1671+
verify(ctx.transactionManager).beginAbort();
1672+
1673+
// Verify commit was never called
1674+
verify(ctx.transactionManager, never()).beginCommit();
1675+
1676+
// Verify sender was woken up
1677+
verify(ctx.sender).wakeup();
1678+
}
1679+
}
1680+
16041681
@Test
16051682
public void testClusterAuthorizationFailure() throws Exception {
16061683
int maxBlockMs = 500;

0 commit comments

Comments
 (0)