Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,50 @@ public void abortTransaction() throws ProducerFencedException {
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}

/**
* Completes a prepared transaction by comparing the provided prepared transaction state with the
* current prepared state on the producer.
* If they match, the transaction is committed; otherwise, it is aborted.
*
* @param preparedTxnState The prepared transaction state to compare against the current state
* @throws IllegalStateException if the producer is not in prepared transaction state
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between IlegalState and InvalidTxnState?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah my bad, I didn't change the Javadoc, illegalState. From my understanding, IllegalStateException is used when the producer is in an incorrect state and cannot use the API?
InvalidTxnStateException is used for when the transaction is in the wrong state.

* @throws InvalidTxnStateException if the producer is not in prepared state
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
* @throws TimeoutException if the time taken for completing the transaction has surpassed <code>max.block.ms</code>
* @throws InterruptException if the thread is interrupted while blocked
*/
@Override
public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException {
throwIfNoTransactionManager();
throwIfProducerClosed();

if (!transactionManager.isPrepared()) {
throw new InvalidTxnStateException("Cannot complete transaction because no transaction has been prepared. " +
"Call prepareTransaction() first, or make sure initTransaction(true) was called.");
}

// Get the current prepared transaction state
PreparedTxnState currentPreparedState = transactionManager.preparedTransactionState();

long completeStart = time.nanoseconds();

// Compare the prepared transaction state token and commit or abort accordingly
if (currentPreparedState.equals(preparedTxnState)) {
Comment thread
rreddy-22 marked this conversation as resolved.
log.info("Committing prepared transaction as the prepared state token matches");
Comment thread
rreddy-22 marked this conversation as resolved.
Outdated
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordCommitTxn(time.nanoseconds() - completeStart);
} else {
log.info("Aborting prepared transaction as the prepared state token does not match");
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordAbortTxn(time.nanoseconds() - completeStart);
}
}

/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,27 @@ public void abortTransaction() throws ProducerFencedException {
this.transactionInFlight = false;
}

@Override
public void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException {
verifyNotClosed();
verifyNotFenced();
verifyTransactionsInitialized();

if (!this.transactionInFlight) {
throw new IllegalStateException("There is no prepared transaction to complete.");
}

// For testing purposes, we'll consider a prepared state with producerId=1000L and epoch=1 as valid
// This should match what's returned in prepareTransaction()
PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1);

if (currentState.equals(preparedTxnState)) {
commitTransaction();
} else {
abortTransaction();
}
}

private synchronized void verifyNotClosed() {
if (this.closed) {
throw new IllegalStateException("MockProducer is already closed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
*/
void abortTransaction() throws ProducerFencedException;

/**
* See {@link KafkaProducer#completeTransaction(PreparedTxnState)}
*/
void completeTransaction(PreparedTxnState preparedTxnState) throws ProducerFencedException;

/**
* @see KafkaProducer#registerMetricForSubscription(KafkaMetric)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ synchronized TxnRequestHandler nextRequest(boolean hasIncompleteBatches) {
log.debug("Not sending EndTxn for completed transaction since no partitions " +
"or offsets were successfully added");
}
completeTransaction();
resetTransactionState();
}
nextRequestHandler = pendingRequests.poll();
}
Expand Down Expand Up @@ -1320,7 +1320,7 @@ boolean canHandleAbortableError() {
return coordinatorSupportsBumpingEpoch || isTransactionV2Enabled;
}

private void completeTransaction() {
private void resetTransactionState() {
if (clientSideEpochBumpRequired) {
transitionTo(State.INITIALIZING);
} else {
Expand All @@ -1332,6 +1332,7 @@ private void completeTransaction() {
newPartitionsInTransaction.clear();
pendingPartitionsInTransaction.clear();
partitionsInTransaction.clear();
preparedTxnState = new PreparedTxnState();
Comment thread
rreddy-22 marked this conversation as resolved.
}

abstract class TxnRequestHandler implements RequestCompletionHandler {
Expand Down Expand Up @@ -1743,7 +1744,7 @@ public void handleResponse(AbstractResponse response) {
setProducerIdAndEpoch(producerIdAndEpoch);
resetSequenceNumbers();
}
completeTransaction();
resetTransactionState();
result.done();
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
Expand Down Expand Up @@ -1601,6 +1602,82 @@ public void testPrepareTransactionFailsWhen2PCDisabled() {
}
}

@Test
public void testCompleteTransactionWithMatchingState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);

when(ctx.transactionManager.isPrepared()).thenReturn(true);
when(ctx.sender.isRunning()).thenReturn(true);

// Create prepared states with matching values
long producerId = 12345L;
short epoch = 5;
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);

// Set up the transaction manager to return the prepared state
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);

// Should trigger commit when states match
TransactionalRequestResult commitResult = mock(TransactionalRequestResult.class);
when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);

try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Call completeTransaction with the matching state
producer.completeTransaction(inputState);

// Verify methods called in order
verify(ctx.transactionManager).isPrepared();
verify(ctx.transactionManager).preparedTransactionState();
verify(ctx.transactionManager).beginCommit();

// Verify abort was never called
verify(ctx.transactionManager, never()).beginAbort();

// Verify sender was woken up
verify(ctx.sender).wakeup();
}
}

@Test
public void testCompleteTransactionWithNonMatchingState() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);

when(ctx.transactionManager.isPrepared()).thenReturn(true);
when(ctx.sender.isRunning()).thenReturn(true);

// Create txn prepared states with different values
long producerId = 12345L;
short epoch = 5;
PreparedTxnState currentState = new PreparedTxnState(producerId, epoch);
PreparedTxnState inputState = new PreparedTxnState(producerId + 1, epoch);

// Set up the transaction manager to return the prepared state
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);

// Should trigger abort when states don't match
TransactionalRequestResult abortResult = mock(TransactionalRequestResult.class);
when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);

try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
// Call completeTransaction with the non-matching state
producer.completeTransaction(inputState);

// Verify methods called in order
verify(ctx.transactionManager).isPrepared();
verify(ctx.transactionManager).preparedTransactionState();
verify(ctx.transactionManager).beginAbort();

// Verify commit was never called
verify(ctx.transactionManager, never()).beginCommit();

// Verify sender was woken up
verify(ctx.sender).wakeup();
}
}

@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;
Expand Down