Transaction Retries using JavaEE and CDI with BMTs
In a previous post, we demonstrated client-side transaction retries by using meta-annotations and Aspect Oriented Programming (AOP) in Spring Boot.
In this post, we'll use a similar concept for a different Java stack: JavaEE (or JakartaEE as it's known today) and interceptors using the Contexts and Dependency Injection (CDI) framework.
Transaction Retries in JavaEE
This article demonstrates an AOP-driven retry strategy for JavaEE apps using the following stack:
Stateless session beans with bean-managed transactions
@AroundAdvice
interceptor for retries@TransactionBoundary
meta-annotation with the interceptor bindingJAX-RS REST endpoint for testing
TomEE 8 as an embedded container with the web profile
JPA and Hibernate for data access
Source Code
The source code for examples of this article can be found on GitHub.
Solution
The Interceptor or proxy pattern is generally used to add cross-cutting functionality or logic in an application without code duplication. Transaction management is a typical cross-cutting concern which is provided out of the box in JavaEE. Public bean methods in stateless/stateful session beans are automatically transactional, given that container-managed transactions (CMTs) are used.
In this example, the service method acting as a boundary should always use REQUIRES_NEW
propagation.
@Stateless
public class OrderService {
@PersistenceContext(unitName = "orderSystemPU")
private EntityManager entityManager;
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public List<Order> findAllOrders() {
CriteriaQuery<Order> cq = entityManager.getCriteriaBuilder().createQuery(Order.class);
cq.select(cq.from(Order.class));
return entityManager.createQuery(cq).getResultList();
}
}
A natural extension to transparent transaction management is transaction retries. If a transaction fails with a transient serialization error, it should be automatically retried several times. This behaviour fits well into an interceptor that we'll use through an interceptor binding (annotation).
In the same example, like this:
@Stateless
public class OrderService {
@PersistenceContext(unitName = "orderSystemPU")
private EntityManager entityManager;
@TransactionBoundary
public Order updateOrder(Order order) {
entityManager.merge(order);
return order;
}
}
To mark classes and methods as repeatable transaction boundaries, let's first create the interceptor binding:
@Inherited
@InterceptorBinding
@Target({TYPE, METHOD})
@Retention(RUNTIME)
public @interface TransactionBoundary {
}
After creating the interceptor binding we need to create the actual interceptor implementation:
@TransactionBoundary
@Interceptor
@Priority(Interceptor.Priority.APPLICATION)
public class TransactionRetryInterceptor {
public static final int MAX_RETRY_ATTEMPTS = 10;
public static final int MAX_BACKOFF_TIME_MILLIS = 15000;
private static final ThreadLocalRandom RAND = ThreadLocalRandom.current();
@PersistenceContext(unitName = "orderSystemPU")
private EntityManager entityManager;
@Inject
private TransactionService transactionService;
@Inject
private Logger logger;
@AroundInvoke
public Object aroundTransactionBoundary(InvocationContext ctx) throws Exception {
Assert.isFalse(entityManager.isJoinedToTransaction(), "Expected no transaction!");
logger.info("Intercepting transactional method in retry loop: {}", ctx.getMethod().toGenericString());
for (int attempt = 1; attempt < MAX_RETRY_ATTEMPTS; attempt++) {
try {
Object rv = transactionService.executeWithinTransaction(ctx::proceed);
if (attempt > 1) {
logger.info("Recovered from transient error (attempt {}): {}",
attempt, ctx.getMethod().toGenericString());
} else {
logger.info("Transactional method completed (attempt {}): {}",
attempt, ctx.getMethod().toGenericString());
}
return rv;
} catch (Exception ex) {
Throwable t = ExceptionUtils.getMostSpecificCause(ex);
if (t instanceof SQLException) {
SQLException sqlException = (SQLException) t;
if (PSQLState.SERIALIZATION_FAILURE.getState().equals(sqlException.getSQLState())) {
long backoffMillis = Math.min((long) (Math.pow(2, attempt) + RAND.nextInt(0, 1000)),
MAX_BACKOFF_TIME_MILLIS);
logger.warn("Detected transient error (attempt {}) backoff for {}ms: {}",
attempt, backoffMillis, sqlException);
try {
Thread.sleep(backoffMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
logger.info("Detected non-transient error (propagating): {}", t.getMessage());
throw ex;
}
} else {
logger.info("Detected non-transient error (propagating): {}", t.getMessage());
throw ex;
}
}
}
throw new SQLTransactionRollbackException("Too many serialization conflicts - giving up retries!");
}
}
There are a few things to highlight here. First is that beans need to use bean-managed transactions (BMTs) to defer the transaction marker to a dedicated service, invoked from the interceptor. Otherwise, when using container-managed transactions, the interceptor is called within the scope of the same transaction which may fail with a serialization error. A retry at that point would do no good since this logic must be applied before transactions are created.
Hence the purpose of `TransactionService` which makes a callback to the original business method within a transaction scope.
@Stateless
public class TransactionService {
@TransactionAttribute(REQUIRES_NEW)
public <T> T executeWithinTransaction(final Callable<T> task) throws Exception {
return task.call();
}
}
The actual stateless service bean declares its intent to use bean-managed transactions:
@Stateless
@TransactionManagement(TransactionManagementType.BEAN)
public class OrderService {
...
To try this out, we'll use a very unsophisticated order system designed to produce unrepeatable read (aka read/write) conflicts to activate the retry mechanism.
We will use multiple concurrent connections and read and write to the same key but with different values. Without client-side retries, one of the transactions would be rolled back with a serialization conflict. With retries, the failed transaction will be retried and eventually run to completion, transparently towards the application logic.
Building
Prerequisites
JDK8+ with 1.8 language level (OpenJDK compatible)
Maven 3+ (optional, embedded)
CockroachDB v22.1+ database
Install the JDK (Linux):
sudo apt-get -qq install -y openjdk-8-jdk
Clone the project
git clone git@github.com/kai-niemi/retry-demo.git
cd retry-demo
Build the project
chmod +x mvnw
./mvnw clean install
Setup
Create the database:
cockroach sql --insecure --host=localhost -e "CREATE database orders"
Create the schema:
cockroach sql --insecure --host=locahlost --database orders < src/resources/conf/create.sql
Start the app:
../mvnw clean install tomee:run
The default listen port is 8090
(can be changed in pom.xml):
Usage
Open another shell and check that the service is up and connected to the DB:
curl http://localhost:8090/api
Get Order Request Form
This prints out an order form template that we will use to create new orders:
curl http://localhost:8090/api/order/template| jq
Alternatively, pipe it to a file:
curl http://localhost:8090/api/order/template > form.json
Submit Order Form
Create a new purchase order:
curl http://localhost:8090/api/order -i -X POST \
-H 'Content-Type: application/json' \
-d '{
"billAddress": {
"address1": "Street 1.1",
"address2": "Street 1.2",
"city": "City 1",
"country": "Country 1",
"postcode": "Code 1"
},
"customerId": -1,
"deliveryAddress": {
"address1": "Street 2.1",
"address2": "Street 2.2",
"city": "City 2",
"country": "Country 2",
"postcode": "Code 2"
},
"requestId": "bc3cba97-dee9-41b2-9110-2f5dfc2c5dae"
}'
Or using the file:
curl http://localhost:8090/api/order -H "Content-Type:application/json" -X POST \
-d "@form.json"
Produce a Read/Write Conflict
Assuming we have an order with ID 1 in status PLACED
. We will now read that order and change the status to something else by using concurrent transactions. This is known as the unrepeatable read
conflict, prevented by 1SR from happening.
To have a predictable outcome, we'll use two sessions with a controllable delay between the read and write operations.
Overview of SQL operations:
select * from purchase_order where id=1; -- T1
-- status is `PLACED`
wait 5s -- T1
select * from purchase_order where id=1; -- T2
wait 5s -- T2
update status='CONFIRMED' where id=1; -- T1
update status='PAID' where id=1; -- T2
commit; -- T1
commit; -- T2 ERROR!
Prepare to run the first command:
curl http://localhost:8090/api/order/1?status=CONFIRMED\&delay=5000 -i -X PUT
Open another session, and prepare to run a similar command in less than 5sec after the first one:
curl http://localhost:8090/api/order/1?status=PAID\&delay=5000 -i -X PUT
When both commands are executed serially, it will cause a serialization conflict like this:
ERROR: restart transaction: TransactionRetryWithProtoRefreshError: WriteTooOldError: write for key /Table/109/1/12/0 at timestamp 1669990868.355588000,0 too old; wrote at 1669990868.778375000,3: "sql txn" meta={id=92409d02 key=/Table/109/1/12/0 pri=0.03022202 epo=0 ts=1669990868.778375000,3 min=1669990868.355588000,0 seq=0} lock=true stat=PENDING rts=1669990868.355588000,0 wto=false gul=1669990868.855588000,0
The interceptor will however catch this error since it has a state code 40001
, retry the business method and eventually succeed and deliver a 200 OK
to the client.
Conclusion
In this article, we implemented a transaction retry strategy for JavaEE stateless session beans using bean-managed transactions and a custom interceptor with interceptor bindings. This reduces the amount of retry logic in ordinary service beans to simply add a @TransactionBoundary
meta-annotation.
Subscribe to my newsletter
Read articles from Kai Niemi directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by