Skip to content

Commit f42abe6

Browse files
authored
KAFKA-19082:[4/4] Complete Txn Client Side Changes (KIP-939) (#19714)
public void completeTransaction(PreparedTxnState preparedTxnState) The method compares the currently prepared transaction state and the state passed in the argument. 1. Commit if the state matches 2. Abort the transaction otherwise. If the producer is not in a prepared state (i.e., neither prepareTransaction was called nor initTransaction(true) was called), we return an INVALID_TXN_STATE error. Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits <alivshits@confluent.io>
1 parent ef14f76 commit f42abe6

5 files changed

Lines changed: 141 additions & 3 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,40 @@ public void abortTransaction() throws ProducerFencedException {
884884
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
885885
}
886886

887+
/**
888+
* Completes a prepared transaction by comparing the provided prepared transaction state with the
889+
* current prepared state on the producer.
890+
* If they match, the transaction is committed; otherwise, it is aborted.
891+
*
892+
* @param preparedTxnState The prepared transaction state to compare against the current state
893+
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
894+
* @throws InvalidTxnStateException if the producer is not in prepared state
895+
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
896+
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
897+
* @throws TimeoutException if the time taken for completing the transaction has surpassed <code>max.block.ms</code>
898+
* @throws InterruptException if the thread is interrupted while blocked
899+
*/
900+
@Override
901+
public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException {
902+
throwIfNoTransactionManager();
903+
throwIfProducerClosed();
904+
905+
if (!transactionManager.isPrepared()) {
906+
throw new InvalidTxnStateException("Cannot complete transaction because no transaction has been prepared. " +
907+
"Call prepareTransaction() first, or make sure initTransaction(true) was called.");
908+
}
909+
910+
// Get the current prepared transaction state
911+
PreparedTxnState currentPreparedState = transactionManager.preparedTransactionState();
912+
913+
// Compare the prepared transaction state token and commit or abort accordingly
914+
if (currentPreparedState.equals(preparedTxnState)) {
915+
commitTransaction();
916+
} else {
917+
abortTransaction();
918+
}
919+
}
920+
887921
/**
888922
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
889923
* See {@link #send(ProducerRecord, Callback)} for details.

clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,27 @@ public void abortTransaction() throws ProducerFencedException {
257257
this.transactionInFlight = false;
258258
}
259259

260+
@Override
261+
public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException {
262+
verifyNotClosed();
263+
verifyNotFenced();
264+
verifyTransactionsInitialized();
265+
266+
if (!this.transactionInFlight) {
267+
throw new IllegalStateException("There is no prepared transaction to complete.");
268+
}
269+
270+
// For testing purposes, we'll consider a prepared state with producerId=1000L and epoch=1 as valid
271+
// This should match what's returned in prepareTransaction()
272+
PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1);
273+
274+
if (currentState.equals(preparedTxnState)) {
275+
commitTransaction();
276+
} else {
277+
abortTransaction();
278+
}
279+
}
280+
260281
private synchronized void verifyNotClosed() {
261282
if (this.closed) {
262283
throw new IllegalStateException("MockProducer is already closed.");

clients/src/main/java/org/apache/kafka/clients/producer/Producer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
7777
*/
7878
void abortTransaction() throws ProducerFencedException;
7979

80+
/**
81+
* See {@link KafkaProducer#completeTransaction(PreparedTxnState)}
82+
*/
83+
void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException;
84+
8085
/**
8186
* @see KafkaProducer#registerMetricForSubscription(KafkaMetric)
8287
*/

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,7 @@ synchronized TxnRequestHandler nextRequest(boolean hasIncompleteBatches) {
913913
log.debug("Not sending EndTxn for completed transaction since no partitions " +
914914
"or offsets were successfully added");
915915
}
916-
completeTransaction();
916+
resetTransactionState();
917917
}
918918
nextRequestHandler = pendingRequests.poll();
919919
}
@@ -1320,7 +1320,7 @@ boolean canHandleAbortableError() {
13201320
return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled;
13211321
}
13221322

1323-
private void completeTransaction() {
1323+
private void resetTransactionState() {
13241324
if (clientSideEpochBumpRequired) {
13251325
transitionTo(State.INITIALIZING);
13261326
} else {
@@ -1332,6 +1332,7 @@ private void completeTransaction() {
13321332
newPartitionsInTransaction.clear();
13331333
pendingPartitionsInTransaction.clear();
13341334
partitionsInTransaction.clear();
1335+
preparedTxnState = new PreparedTxnState();
13351336
}
13361337

13371338
abstract class TxnRequestHandler implements RequestCompletionHandler {
@@ -1743,7 +1744,7 @@ public void handleResponse(AbstractResponse response) {
17431744
setProducerIdAndEpoch(producerIdAndEpoch);
17441745
resetSequenceNumbers();
17451746
}
1746-
completeTransaction();
1747+
resetTransactionState();
17471748
result.done();
17481749
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
17491750
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
3333
import org.apache.kafka.clients.producer.internals.Sender;
3434
import org.apache.kafka.clients.producer.internals.TransactionManager;
35+
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
3536
import org.apache.kafka.common.Cluster;
3637
import org.apache.kafka.common.KafkaException;
3738
import org.apache.kafka.common.Metric;
@@ -1600,6 +1601,82 @@ public void testPrepareTransactionFailsWhen2PCDisabled() {
16001601
}
16011602
}
16021603

1604+
@Test
1605+
public void testCompleteTransactionWithMatchingState() throws Exception {
1606+
StringSerializer serializer = new StringSerializer();
1607+
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
1608+
1609+
when(ctx.transactionManager.isPrepared()).thenReturn(true);
1610+
when(ctx.sender.isRunning()).thenReturn(true);
1611+
1612+
// Create prepared states with matching values
1613+
long producerId = 12345L;
1614+
short epoch = 5;
1615+
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
1616+
PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);
1617+
1618+
// Set up the transaction manager to return the prepared state
1619+
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
1620+
1621+
// Should trigger commit when states match
1622+
TransactionalRequestResult commitResult = mock(TransactionalRequestResult.class);
1623+
when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);
1624+
1625+
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
1626+
// Call completeTransaction with the matching state
1627+
producer.completeTransaction(inputState);
1628+
1629+
// Verify methods called in order
1630+
verify(ctx.transactionManager).isPrepared();
1631+
verify(ctx.transactionManager).preparedTransactionState();
1632+
verify(ctx.transactionManager).beginCommit();
1633+
1634+
// Verify abort was never called
1635+
verify(ctx.transactionManager, never()).beginAbort();
1636+
1637+
// Verify sender was woken up
1638+
verify(ctx.sender).wakeup();
1639+
}
1640+
}
1641+
1642+
@Test
1643+
public void testCompleteTransactionWithNonMatchingState() throws Exception {
1644+
StringSerializer serializer = new StringSerializer();
1645+
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
1646+
1647+
when(ctx.transactionManager.isPrepared()).thenReturn(true);
1648+
when(ctx.sender.isRunning()).thenReturn(true);
1649+
1650+
// Create txn prepared states with different values
1651+
long producerId = 12345L;
1652+
short epoch = 5;
1653+
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
1654+
PreparedTxnState inputState = new PreparedTxnState(producerId + 1, epoch);
1655+
1656+
// Set up the transaction manager to return the prepared state
1657+
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
1658+
1659+
// Should trigger abort when states don't match
1660+
TransactionalRequestResult abortResult = mock(TransactionalRequestResult.class);
1661+
when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);
1662+
1663+
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
1664+
// Call completeTransaction with the non-matching state
1665+
producer.completeTransaction(inputState);
1666+
1667+
// Verify methods called in order
1668+
verify(ctx.transactionManager).isPrepared();
1669+
verify(ctx.transactionManager).preparedTransactionState();
1670+
verify(ctx.transactionManager).beginAbort();
1671+
1672+
// Verify commit was never called
1673+
verify(ctx.transactionManager, never()).beginCommit();
1674+
1675+
// Verify sender was woken up
1676+
verify(ctx.sender).wakeup();
1677+
}
1678+
}
1679+
16031680
@Test
16041681
public void testClusterAuthorizationFailure() throws Exception {
16051682
int maxBlockMs = 500;

0 commit comments

Comments
 (0)