21 November, 2020

Postgres transactions with jasync-sql on Ktor

Over the past few weeks I've been working on non-blocking database connectivity for my Kotlin web stack. I'm using Ktor as a web framework and Postgres for my database. I'm using jasync-sql as a lightweight, non-blocking driver and so far it's worked great. I've finished the logic for the (very minimal) ORM, and the final piece of the puzzle is to get database transactions playing nicely.

The target

jasync-sql ships with functionality for executing queries as part of a transaction:

// Defined in Connection.kt
fun <A> inTransaction(f: (Connection) -> CompletableFuture<A>): CompletableFuture<A>

but in order to call this we need a reference to a Connection instance (obtained from the ConnectionPool) whenever we want to start a transaction:

class CustomerService(
    private val customerDao: CustomerDAO,
    private val connection: Connection // <- We have to add this 
) {

    fun processCustomer(id: String) {
        connection.inTransaction {
            
        }   
    }

}

The way I use Ktor means that I'd like the ability to start transactions in my logic layer classes without making them dependent on the ConnectionPool. A static withTransaction {} function seems like a good way to go.

withTransaction {
    // All queries executed on this coroutine or any of its children 
    // should be part of the same transaction.
}

The Approach

At the moment I'm using Kodein to inject a singleton instance of a ConnectionPool in my data layer classes and calling sendPreparedStatement on the pool to dispatch queries to the database. For queries that happen outside of a transaction this is still the desired behaviour. For queries that happen as part of a transaction, we need to ensure that:

  • They're executed on the same connection
  • We first execute a BEGIN TRANSACTION statement and after we're done, either a COMMIT or ROLLBACK statement on

the connection.

To achieve this we'll need to wrap the ConnectionPool in a class that includes some extra functionality. We can inject this class in the data layer as before. When withTransaction is called we'll take a connection from the pool, open a transaction and store it in the current coroutine context. Instead of dispatching queries straight to the pool, we can retrieve the connection from the context and execute the query as part of the transaction associated with the current coroutine.

The Code

The TransactionManager class wraps a connectionPool instance and we'll inject a singleton instance of this class into my data layer classes instead of the connectionPool. Because the static calls to withTransaction need access to the singleton instance of the TransactionManager, I'm making the primary constructor private and using a static singleton pattern to create an instance:

// TransactionManager.kt
class TransactionManager private constructor(
    private val connectionPool: ConnectionPool<PostgreSQLConnection>
) {    

    companion object {
    
        private var transactionManager: TransactionManager? = null

        @Synchronized
        fun getManager(
            username: String,
            password: String,
            database: String,
            host: String = "localhost",
            port: Int = 5432,
            maxActiveConnections: Int = 8 // Use-case dependent
        ): TransactionManager = transactionManager ?: TransactionManager(
            PostgreSQLConnectionBuilder.createConnectionPool {
                this.username = username
                this.password = password
                this.database = database
                this.host = host
                this.port = port
                this.maxActiveConnections = maxActiveConnections
            }
        ).also { transactionManager = it }
    }
}

We can also define the sendQuery function that will be the replacement for the sendPreparedStatement function on the ConnectionPool:

fun sendQuery(query: String, values: List<Any?> = emptyList()) 
    = connectionPool.sendPreparedStatement(query, values)

At this point we've got a replacement for the ConnectionPool and I can wire this up using Kodein:

bind<TransactionManager>() with singleton {
    TransactionManager.getManager(
        username = config("db.user"),
        password = config("db.password"),
        database = config("db.database"),
        host = config("db.host")
    )
}

Implementing transactions

When we start a transaction we'll need to hold a reference to the active connection in the coroutine context. The first thing we'll need is a coroutine context element to allow us to load and store the connection:

class TransactionElement(
    val connection: PostgreSQLConnection
) : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<TransactionElement>

    override val key: CoroutineContext.Key<TransactionElement>
        get() = TransactionElement
}

Now we can work on the logic for the static inTransaction call that'll wrap a block in a transaction. The basic logic will be:

  1. Check that the transactionManager has been initialised.
  2. If we're choosing to propagate the existing transaction we don't need to do anything.
  3. If we're choose to create a new transaction, either because we're not already in a transaction or because we don't

want to propagate the existing one, grab a new connection from the pool and store it in a new TransactionElement.

  1. Launch a new coroutine with the existing context if we're reusing a connection or with the existing context plus

the new TransactionElement if we're starting a new transaction. The context api replaces existing elements of the same type when an existing context is added to a new one. It makes operations like this as simple as adding the new element to the existing context and you can read more about it here.

The basic signature looks something like this:

suspend fun <T> inTransaction(
    isolation: Isolation = Isolation.READ_COMMITTED,
    propagation: Propagation = Propagation.INHERIT,
    block: suspend () -> T
): T

I've created enum classes to represent transaction isolation and propagation behaviour and have allowed the function to accept a suspending block. The first thing we'll do is check that the transaction manager is initialised and determine what the context for the launched coroutine should be:

if (transactionManager == null) throw IllegalStateException("Transaction manager not initialised")

val context = when {
    propagation == Propagation.INHERIT
        && coroutineContext[TransactionElement.Key] != null -> coroutineContext
    else -> coroutineContext + TransactionElement(transactionManager!!.connectionPool.take().await())
}

We're ready to start the transaction:

return withContext(context) {

    val connection = coroutineContext[TransactionElement.Key]!!.connection
    connection.sendQuery("BEGIN TRANSACTION ISOLATION LEVEL ${isolation.value}").await()

    runCatching { block() }
        .onSuccess { connection.sendQuery("COMMIT").await() }
        .onFailure { connection.sendQuery("ROLLBACK").await() }
        .also { transactionManager!!.connectionPool.giveBack(connection) }
        .getOrThrow()
}

Here we're launching a new coroutine with the context we built in the previous step and fetching the stored connection from it. Next we send a statement to Postgres to start the transaction; being careful to .await() the result to avoid sending concurrent transactions to the raw connection as they have no buffering mechanism.

We can now execute the supplied block, using the Result api to commit the transaction when the block completes or roll it back if an exception is thrown. We return the connection to the pool and return the result of the block invocation or rethrow the caught exception.

The last step is modifying the sendQuery function to use the connection stored in the TransactionElement if it exists and to dispatch to the connection pool if it doesn't:

suspend fun sendQuery(query: String, values: List<Any?> = emptyList()) 
= when (val transaction = coroutineContext[TransactionElement.Key]) {
    null -> connectionPool.sendPreparedStatement(query, values)
    else -> transaction.connection.sendPreparedStatement(query, values)
}

Summary

The code here implements simple Postgres transaction support using jasync-sql and should help you get another step closer to a fully async stack Ktor web stack. Off the top of my head there are a few issues with it but it should be a good start:

  • I haven't implemented a buffer for the connection which means that any concurrent queries executed will break. I've

left this as an exercise for the reader as it's pretty trivial.

  • I think the TransactionManager initialisation code is thread safe but I haven't tested this in anger.
  • Every new transaction takes a connection from the pool. If you nest n + 1 calls to inTransaction with CREATES_NEW

propagation, you will exhaust a pool of size n. If that happens the whole thing deadlocks as parent transaction coroutines remain suspended indefinitely waiting for their nested transaction to acquire a connection from the pool.

The full code from TransactionManager.kt is shown below and if you've got any questions or suggestions, drop me an email on henry (dot) course (at) gmail or on Kotlin Slack.

package com.henrycourse.util.db

import com.github.jasync.sql.db.pool.ConnectionPool
import com.github.jasync.sql.db.postgresql.PostgreSQLConnection
import com.github.jasync.sql.db.postgresql.PostgreSQLConnectionBuilder
import kotlinx.coroutines.future.await
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext

class TransactionManager private constructor(
    private val connectionPool: ConnectionPool<PostgreSQLConnection>
) {

    suspend fun sendQuery(query: String, values: List<Any?> = emptyList()) = when (
        val transaction = coroutineContext[TransactionElement.Key]) {
        null -> connectionPool.sendPreparedStatement(query, values)
        else -> transaction.connection.sendPreparedStatement(query, values)
    }

    companion object {

        private var transactionManager: TransactionManager? = null

        @Synchronized
        fun getManager(
            username: String,
            password: String,
            database: String,
            host: String = "localhost",
            port: Int = 5432,
            maxActiveConnections: Int = 8
        ): TransactionManager = transactionManager ?: TransactionManager(
            PostgreSQLConnectionBuilder.createConnectionPool {
                this.username = username
                this.password = password
                this.database = database
                this.host = host
                this.port = port
                this.maxActiveConnections = maxActiveConnections
            }
        ).also { transactionManager = it }

        suspend fun <T> inTransaction(
            isolation: Isolation = Isolation.READ_COMMITTED,
            propagation: Propagation = Propagation.INHERIT,
            block: suspend () -> T
        ): T {

            if (transactionManager == null) throw IllegalStateException("Transaction manager not initialised")

            val context = when {
                propagation == Propagation.INHERIT
                    && coroutineContext[TransactionElement.Key] != null -> coroutineContext
                else -> coroutineContext + TransactionElement(transactionManager!!.connectionPool.take().await())
            }

            return withContext(context) {

                val connection = coroutineContext[TransactionElement.Key]!!.connection
                connection.sendQuery("BEGIN TRANSACTION ISOLATION LEVEL ${isolation.value}").await()

                runCatching { block() }
                    .onSuccess { connection.sendQuery("COMMIT").await() }
                    .onFailure { connection.sendQuery("ROLLBACK").await() }
                    .also { transactionManager!!.connectionPool.giveBack(connection) }
                    .getOrThrow()
            }
        }
    }
}

class TransactionElement(
    val connection: PostgreSQLConnection
) : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<TransactionElement>

    override val key: CoroutineContext.Key<TransactionElement>
        get() = TransactionElement
}

enum class Isolation(val value: String) {
    READ_COMMITTED("READ COMMITTED"),
    READ_UNCOMMITTED("READ UNCOMMITTED"),
    REPEATABLE_READ("REPEATABLE READ"),
    SERIALIZABLE("SERIALIZABLE")
}

enum class Propagation {
    INHERIT,
    CREATE_NEW
}

© 2023 Henry Course