21 November, 2020
Postgres transactions with jasync-sql on Ktor
Over the past few weeks I've been working on non-blocking database connectivity for my Kotlin web stack. I'm using Ktor as a web framework and Postgres for my database. I'm using jasync-sql as a lightweight, non-blocking driver and so far it's worked great. I've finished the logic for the (very minimal) ORM, and the final piece of the puzzle is to get database transactions playing nicely.
The target
jasync-sql ships with functionality for executing queries as part of a transaction:
// Defined in Connection.kt
fun <A> inTransaction(f: (Connection) -> CompletableFuture<A>): CompletableFuture<A>
but in order to call this we need a reference to a Connection
instance (obtained from the ConnectionPool
) whenever
we want to start a transaction:
class CustomerService(
private val customerDao: CustomerDAO,
private val connection: Connection // <- We have to add this
) {
fun processCustomer(id: String) {
connection.inTransaction {
}
}
}
The way I use Ktor means that I'd like the ability to start transactions in my logic
layer classes without making them dependent on the ConnectionPool
. A static withTransaction {}
function seems like
a good way to go.
withTransaction {
// All queries executed on this coroutine or any of its children
// should be part of the same transaction.
}
The Approach
At the moment I'm using Kodein to inject a singleton instance of a ConnectionPool
in my data layer classes and
calling sendPreparedStatement
on the pool to dispatch queries to the database. For queries that happen outside of a
transaction this is still the desired behaviour. For queries that happen as part of a transaction, we need to ensure that:
- They're executed on the same connection
- We first execute a
BEGIN TRANSACTION
statement and after we're done, either aCOMMIT
orROLLBACK
statement on the connection.
To achieve this we'll need to wrap the ConnectionPool
in a class that includes some extra functionality. We
can inject this class in the data layer as before. When withTransaction
is called we'll take a connection
from the pool, open a transaction and store it in the current coroutine context. Instead of dispatching queries
straight to the pool, we can retrieve the connection from the context and execute the query as part of the
transaction associated with the current coroutine.
The Code
The TransactionManager
class wraps a connectionPool
instance and we'll inject a singleton instance of this class into
my data layer classes instead of the connectionPool
. Because the static calls to withTransaction
need access to the
singleton instance of the TransactionManager
, I'm making the primary constructor private and using a static
singleton pattern to create an instance:
// TransactionManager.kt
class TransactionManager private constructor(
private val connectionPool: ConnectionPool<PostgreSQLConnection>
) {
companion object {
private var transactionManager: TransactionManager? = null
@Synchronized
fun getManager(
username: String,
password: String,
database: String,
host: String = "localhost",
port: Int = 5432,
maxActiveConnections: Int = 8 // Use-case dependent
): TransactionManager = transactionManager ?: TransactionManager(
PostgreSQLConnectionBuilder.createConnectionPool {
this.username = username
this.password = password
this.database = database
this.host = host
this.port = port
this.maxActiveConnections = maxActiveConnections
}
).also { transactionManager = it }
}
}
We can also define the sendQuery
function that will be the replacement for the sendPreparedStatement
function
on the ConnectionPool
:
fun sendQuery(query: String, values: List<Any?> = emptyList())
= connectionPool.sendPreparedStatement(query, values)
At this point we've got a replacement for the ConnectionPool
and I can wire this up using Kodein:
bind<TransactionManager>() with singleton {
TransactionManager.getManager(
username = config("db.user"),
password = config("db.password"),
database = config("db.database"),
host = config("db.host")
)
}
Implementing transactions
When we start a transaction we'll need to hold a reference to the active connection in the coroutine context. The first thing we'll need is a coroutine context element to allow us to load and store the connection:
class TransactionElement(
val connection: PostgreSQLConnection
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<TransactionElement>
override val key: CoroutineContext.Key<TransactionElement>
get() = TransactionElement
}
Now we can work on the logic for the static inTransaction
call that'll wrap a block in a transaction. The basic logic
will be:
- Check that the
transactionManager
has been initialised. - If we're choosing to propagate the existing transaction we don't need to do anything.
- If we're choose to create a new transaction, either because we're not already in a transaction or because we don't
want to propagate the existing one, grab a new connection from the pool and store it in a new
TransactionElement
. - Launch a new coroutine with the existing context if we're reusing a connection or with the existing context plus
the new
TransactionElement
if we're starting a new transaction. The context api replaces existing elements of the same type when an existing context is added to a new one. It makes operations like this as simple as adding the new element to the existing context and you can read more about it here.
The basic signature looks something like this:
suspend fun <T> inTransaction(
isolation: Isolation = Isolation.READ_COMMITTED,
propagation: Propagation = Propagation.INHERIT,
block: suspend () -> T
): T
I've created enum classes to represent transaction isolation and propagation behaviour and have allowed the function to accept a suspending block. The first thing we'll do is check that the transaction manager is initialised and determine what the context for the launched coroutine should be:
if (transactionManager == null) throw IllegalStateException("Transaction manager not initialised")
val context = when {
propagation == Propagation.INHERIT
&& coroutineContext[TransactionElement.Key] != null -> coroutineContext
else -> coroutineContext + TransactionElement(transactionManager!!.connectionPool.take().await())
}
We're ready to start the transaction:
return withContext(context) {
val connection = coroutineContext[TransactionElement.Key]!!.connection
connection.sendQuery("BEGIN TRANSACTION ISOLATION LEVEL ${isolation.value}").await()
runCatching { block() }
.onSuccess { connection.sendQuery("COMMIT").await() }
.onFailure { connection.sendQuery("ROLLBACK").await() }
.also { transactionManager!!.connectionPool.giveBack(connection) }
.getOrThrow()
}
Here we're launching a new coroutine with the context we built in the previous step and fetching the stored connection
from it. Next we send a statement to Postgres to start the transaction; being careful to .await()
the result to avoid
sending concurrent transactions to the raw connection as they have no buffering mechanism.
We can now execute the supplied block, using the Result
api to commit the transaction when the block completes
or roll it back if an exception is thrown. We return the connection to the pool and return the result of the block
invocation or rethrow the caught exception.
The last step is modifying the sendQuery
function to use the connection stored in the TransactionElement
if it
exists and to dispatch to the connection pool if it doesn't:
suspend fun sendQuery(query: String, values: List<Any?> = emptyList())
= when (val transaction = coroutineContext[TransactionElement.Key]) {
null -> connectionPool.sendPreparedStatement(query, values)
else -> transaction.connection.sendPreparedStatement(query, values)
}
Summary
The code here implements simple Postgres transaction support using jasync-sql and should help you get another step closer to a fully async stack Ktor web stack. Off the top of my head there are a few issues with it but it should be a good start:
- I haven't implemented a buffer for the connection which means that any concurrent queries executed will break. I've left this as an exercise for the reader as it's pretty trivial.
- I think the
TransactionManager
initialisation code is thread safe but I haven't tested this in anger. - Every new transaction takes a connection from the pool. If you nest n + 1 calls to
inTransaction
withCREATES_NEW
propagation, you will exhaust a pool of size n. If that happens the whole thing deadlocks as parent transaction coroutines remain suspended indefinitely waiting for their nested transaction to acquire a connection from the pool.
The full code from TransactionManager.kt
is shown below and if you've got any questions or suggestions, drop me an
email on henry (dot) course (at) gmail or on Kotlin Slack.
package com.henrycourse.util.db
import com.github.jasync.sql.db.pool.ConnectionPool
import com.github.jasync.sql.db.postgresql.PostgreSQLConnection
import com.github.jasync.sql.db.postgresql.PostgreSQLConnectionBuilder
import kotlinx.coroutines.future.await
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
class TransactionManager private constructor(
private val connectionPool: ConnectionPool<PostgreSQLConnection>
) {
suspend fun sendQuery(query: String, values: List<Any?> = emptyList()) = when (
val transaction = coroutineContext[TransactionElement.Key]) {
null -> connectionPool.sendPreparedStatement(query, values)
else -> transaction.connection.sendPreparedStatement(query, values)
}
companion object {
private var transactionManager: TransactionManager? = null
@Synchronized
fun getManager(
username: String,
password: String,
database: String,
host: String = "localhost",
port: Int = 5432,
maxActiveConnections: Int = 8
): TransactionManager = transactionManager ?: TransactionManager(
PostgreSQLConnectionBuilder.createConnectionPool {
this.username = username
this.password = password
this.database = database
this.host = host
this.port = port
this.maxActiveConnections = maxActiveConnections
}
).also { transactionManager = it }
suspend fun <T> inTransaction(
isolation: Isolation = Isolation.READ_COMMITTED,
propagation: Propagation = Propagation.INHERIT,
block: suspend () -> T
): T {
if (transactionManager == null) throw IllegalStateException("Transaction manager not initialised")
val context = when {
propagation == Propagation.INHERIT
&& coroutineContext[TransactionElement.Key] != null -> coroutineContext
else -> coroutineContext + TransactionElement(transactionManager!!.connectionPool.take().await())
}
return withContext(context) {
val connection = coroutineContext[TransactionElement.Key]!!.connection
connection.sendQuery("BEGIN TRANSACTION ISOLATION LEVEL ${isolation.value}").await()
runCatching { block() }
.onSuccess { connection.sendQuery("COMMIT").await() }
.onFailure { connection.sendQuery("ROLLBACK").await() }
.also { transactionManager!!.connectionPool.giveBack(connection) }
.getOrThrow()
}
}
}
}
class TransactionElement(
val connection: PostgreSQLConnection
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<TransactionElement>
override val key: CoroutineContext.Key<TransactionElement>
get() = TransactionElement
}
enum class Isolation(val value: String) {
READ_COMMITTED("READ COMMITTED"),
READ_UNCOMMITTED("READ UNCOMMITTED"),
REPEATABLE_READ("REPEATABLE READ"),
SERIALIZABLE("SERIALIZABLE")
}
enum class Propagation {
INHERIT,
CREATE_NEW
}