Parallel execution: A Hibernate Reactive Gotcha
Hibernate Reactive is a reactive API for Hibernate, enabling you to create applications that use Hibernate for persistence, but that don’t rely on JDBC and that - unlike regular Hibernate and JDBC - work without blocking any threads.
Hibernate Reactive gives you a reactive Session
, with many of the operations that are familiar for Hibernate users, but this time they don’t perform their work in a blocking way, but rather return an asynchronous type.
The reactive Session
is available in two flavours: the Stage.Session
which uses Java’s CompletionStage
as the async type, and a Mutiny.Session
, which uses Red Hat’s Mutiny asynchronous type Uni
.
In this blog post, I’m using the Mutiny flavour, which is what you’d typically use in a Quarkus application.
Evaluating Uni
s in parallel
When dealing with Uni
s, a common need is to evaluate them in parallel. For example, when we call two web services, whose responses are returned in a Uni
, we can combine the result using Uni.combine().all()
:
Uni<String> one = service1.call();
Uni<String> two = service2.call();
Uni.combine()
.all()
.unis(one, two)
.combinedWith((first, second) -> first + " " + second);
This will subscribe to both Uni
s in parallel.
In fact, it’s quite common to subscribe to multiple Uni
s in parallel. A consumer of Uni
s would typically expect that it can do so.
Hibernate Reactive Uni
s
The Uni
s that you get back from Hibernate Reactive are different though! The Hibernate documentation has a pretty clear warning:
The session is not thread-safe (or "stream-safe"), so using it across different threads (or reactive streams) may cause bugs that are extremely hard to detect. Don’t say we didn’t warn you!
If you’re an unsuspecting person thinking I don’t need to read the Hibernate docs, because I already know Uni
s, you might be inclined to evaluate multiple Uni
s that you get from Hibernate reactive in parallel, for example using Uni.join()
or Uni.combine()
.
If you do that, you’ll run into "wonderful" errors such as:
2023-05-19 14:39:10,359 ERROR [io.sma.graphql] (vert.x-eventloop-thread-2) SRGQL012000: Data Fetching Error: java.lang.IllegalStateException: Session/EntityManager is closed
at org.hibernate.internal.AbstractSharedSessionContract.checkOpen(AbstractSharedSessionContract.java:429)
or
2023-05-19 14:42:48,145 ERROR [io.sma.graphql] (vert.x-eventloop-thread-1) SRGQL012000: Data Fetching Error: java.lang.IllegalStateException: Illegal pop() with non-matching JdbcValuesSourceProcessingState
What’s going on exactly?
As mentioned in the docs, Hibernate Reactive doesn’t support using the same Session
from multiple reactive streams. This is effectively what you do when you evaluate multiple Uni
s that use the same Session
in parallel.
Concurrent opening of a Session
with Quarkus SessionOperations
is also not supported, which is something that could happen if you evaluable multiple Uni
s that use the SessionOperations.withSession
or SessionOperations.withTransaction
methods or the @WithSession
or @WithTransaction
annotations.
It’s interesting to realize that the problem is not only parallel execution (on multiple threads) that’s problematic here, but also concurrent execution of two streams by the same thread. The latter is what typically happens, because Quarkus runs everything belonging to the same Vertx context on the same Vertx eventloop thread.
Conceptually, what happens is that the first stream makes a query to the DB and stores some state in the Vertx context. Then it hits an async boundary, because it needs to wait for the DB to return a response. The scheduler then runs the second stream (on the same thread) which finds unexpected state in the Vertx context, and it throws an exception.
How to prevent this?
If your own code evaluates Uni
s in parallel
If your own code evaluates Uni
s in parallel, you have several options:
The most obvious option is to use .usingConcurrencyOf(1)
when using Uni.join()
or Uni.combine()
, for example:
Uni.join().all(one, two).usingConcurrencyOf(1); // Will evaluate `one` and `two` sequentially.
Another way is to use flatMap
for more explicit sequential evaluation:
one.flatMap(x -> two.map(y -> x + " " + y)); // Will evaluate `one` and `two` sequentially.
If someone else’s code evaluates Uni’s in parallel
Sometimes you pass Uni
s from Hibernate Reactive to some library for further processing. In that case, you may not have control over how this library evaluates the Uni
s.
This bit us in the past, when using Hibernate Reactive together with Smallrye GraphQL: https://github.com/quarkusio/quarkus/issues/32870
In this scenario, the GraphQL engine decides to evaluate Uni
s in parallel (which it should, according to the GraphQL spec!), which is not allowed for Hibernate Uni
s.
In the next sections we’ll dive a little deeper into the fundamental problem here, solutions of other libraries, and what we can do in our situation.
What’s the problem here?
In my opinion, the fundamental problem is that Hibernate Reactive doesn’t return 'regular' Uni
s, but Uni
s with an additional instruction manual. The instruction manual (don’t evaluate them in parallel!) doesn’t show up in the type, so developers won’t know about them until they read the manual. Worse, other libraries won’t know about them either and violate their rules.
Another approach: Doobie
Doobie is a popular database access library for Scala, which has solved this by using a specific type ConnectionIO
, which does not allow parallel composition, but only sequential composition. You can convert from this type to a more general type (somewhat like Uni
), but at that point you define the transaction boundary.
So the entire instruction manual of you can not run database work on the same transaction in parallel, which is similar to the constraint of Hibernate, is encoded in the type system. Beautiful.
Making your Uni
s safe
So, the problem is that Hibernate Uni
s should not be evaluated in parallel, and that libraries that consume the Uni
s are not aware of this restriction.
So naturally, we could ask: can we remove this constraint, and make our Uni
s safe for parallel execution?
The answer is that we can, and it’s not terribly complicated.
What we want to do is to shift the responsibility for sequential execution from outside the Uni
s to inside the Uni
s. So that if two Hibernate Reactive Uni
s are run in parallel, one of them will wait with executing the actual Hibernate work until the other is done.
We will make a mutex for Uni
s that doesn’t block threads, allowing multiple Uni
s evaluated in parallel to sequence themselves, without blocking threads.
A Uni Mutex
We want to create a semaphore that we can use to protect a critical section:
public interface UniSemaphore {
<T> Uni<T> protect(Supplier<Uni<T>> inner);
}
If we have an instance of this UniSemaphore
, we can protect critical sections by calling the protect
method. The returned Uni<T>
will acquire a permit from the semaphore before executing, and return it when it’s completed (either with a value or with an error).
Here’s an implementation:
class UniSemaphoreImpl implements UniSemaphore {
private int permits;
private final Queue<UniEmitter<Void>> queue;
public UniSemaphoreImpl(int permits) {
assert(permits > 0);
this.permits = permits;
queue = new LinkedBlockingDeque<>();
}
@Override
public <T> Uni<T> protect(Uni<T> uni) {
return acquire().replaceWith(uni).eventually(this::release);
}
private Uni<Void> release() {
return Uni.createFrom().item(() -> {
synchronized (this) {
UniEmitter<Void> next = queue.poll();
if (next == null) {
permits++;
} else {
next.complete(null);
}
return null;
}
});
}
private Uni<Void> acquire() {
return Uni.createFrom().deferred(() -> {
synchronized (this) {
if (permits >= 1) {
permits--;
return Uni.createFrom().voidItem();
} else {
return Uni.createFrom().emitter(emitter -> queue.add((UniEmitter<Void>) emitter));
}
}
});
}
}
The protect
method will wrap the Uni
with work (typically the Uni
doing Hibernate reactive stuff), between an acquire
and a release
.
acquire
will see if there are permits, if so it’ll take one and perform the work. If not, it will schedule the work in a queue and immediately return a Uni
. This way, it doesn’t block the thread if there’s no permit available.
release
will run when the work completes or when it fails and will either start queued work, or return the permit.
Now we can convert 'unsafe' Uni
s to 'safe' Uni
s, that a user can safely run in parallel:
Uni<String> unsafe1 = repo.getFoo();
Uni<String> unsafe2 = repo.getBar();
Uni.join().all(unsafe1, unsafe2).andFailFast(); // This will cause Hibernate exceptions
// Now make them 'safe':
UniSemaphore mutex = new UniSemaphoreImpl(1);
Uni<String> safe1 = mutex.protect(() -> unsafe1);
Uni<String> safe2 = mutex.protect(() -> unsafe2);
Uni.join().all(safe1, safe2).andFailFast(); // This will work properly
Alternative approach
An alternative approach to executing Uni
s in parallel, is making sure each Uni
runs on their own Vertx context, with each their own Hibernate Reactive Session:
public static <T> Uni<T> runOnDuplicateContex(Supplier<Uni<T>> uni) {
Context ctx = VertxContext.createNewDuplicatedContext();
VertxContextSafetyToggle.setContextSafe(ctx, true);
return Uni.createFrom().emitter(e ->
ctx.runOnContext(ignore -> uni.get().subscribe().with(e::complete, e::fail)));
}
This allows true parallel execution of Uni
s, at the expense of losing transactionality.
Next steps
In a follow-up blog post we’ll show how to use CDI Interceptors to do the wrapping of Uni
. This makes the conversion from a Uni
you can’t run in parallel to one that you can, a little nicer.