30 April, 2020

Caching in Ktor with a JVM detour

Over the past few months I've been spending some time working with the Ktor web framework for some personal projects and I'm at the point now where it's replaced Spring as my goto Kotlin web framework. If you haven't worked with Ktor, it's a Kotlin-native, non-blocking web framework that makes extensive use of Kotlin coroutines throughout.

The Spring ecosystem is vast and there's always code out there somewhere for whatever you're trying to build. This leads to a pattern of

  1. Need a new feature.
  2. Find the relevant Spring module.
  3. Spend anywhere from 2 minutes to 2 weeks figuring out which magic incantation is required to get the bean autoconfiguration to work with your existing code.

Most of the time this works pretty well and in general I think that even though it's definitely less fun spending an hour configuring code that other people have used, tested and validated (versus writing your own implementation from scratch); if productivity is the priority then it's hard to justify not going with the proven option (I can of course, only speak for my own engineering skills though!).

Having said all this, there are some fundamental aspects of Spring that have left me looking for a different non-blocking framework. Spring Reactive does the job, has a lot of potential as it looks like it's going to be one of the first major frameworks with first class reactive SQL driver support via Spring Data R2DBC but as a framework written in Java it feels a bit dated and doesn't take advantage of Kotlin language features.

I've been porting over the functionality I use in Spring to Ktor as I've found the need and this post will go through my adventures with in-memory caching.

Caffeine

Caffeine is my JVM in-memory cache of choice and I've used it with Spring and other synchronous frameworks. I wanted to find a way to integrate it with Ktor that is both asynchronous and beautiful.

Caffeine provides an asynchronous API based on Java 8 Futures which is trivial to interop with Kotlin Coroutines. The coroutine jdk8 library provides the needed conversion functions to transform Futures to coroutine async primitives and vice versa. A typical builder function for an AsyncLoadingCache looks something like:

// Where K is the key type and V is the value type:
val cache: AsyncLoadingCache<K, V> =
        Caffeine
            .newBuilder()
            .buildAsync { key: K, executor: Executor ->
                ...loader function that returns CompletableFuture<V>
            }

Using the library we can wrap a coroutine execution in a future wrapper which lets us interop with future-based libraries. To start with we create a coroutine context in which to execute our async loading functions:

val cacheContext = CoroutineScope(Dispatchers.Default + SupervisorJob())

Note that we're using the default dispatcher and a SupervisorJob as the parent coroutine to ensure that execution continues if execution of a loader throws an exception.

We can use this context in combination with the CoroutineScope.future {} function provided by the jdk8 library to provide the our loader lambda as an argument to the Caffeine .buildAsync {} function:

.buildAsync { key: K, _ -> cacheContext.future { loader(key) } }

where loader is of type suspend (K) -> V. We now have a cache that loads new values asynchronously and is built on top of the Kotlin coroutine primitives that we know and love.

Beauty

We could call it a day at this point but it'd nice if we could get everything to integrate a bit better with the block DSL style that Ktor makes use of. As it stands, every time we want a new cache instance we have to create a reference to a cache outside of the function in which we want to use it:

val cache = Caffeine
    .newBuilder()
    .buildAsync { key: K, _ ->
        loader code here...
    }

suspend fun returnsACachedValue(key: K) = cache.get(key)

It works but it's not elegant. Something like this would be much nicer, with the loader code appearing inline:

suspend fun returnsACachedValue(key: K) = cached(key) {
    loader code here...
}

It's trivial even to add the ability to configure the cache that backs this function by adding a cacheConfig argument to cached() that takes a configuration lambda as an argument, leaving us with a signature that looks like:

suspend fun <K, V> cached(
    key: K,
    cacheConfig: Caffeine<Any, Any>.() -> Caffeine<Any, Any>,
    loader: suspend (K) -> V
): V

The key(s) to the cache map

The reader will have noticed that we need to persist some state (the cache instance) between invocations of calls to cached(). In order to avoid having to declare a reference to a new cache every time we need one, we need a collection to store our caches in.

A map is an obvious choice for this but what should be used to key it? This section is more of a curiosity for anyone interested in how lambdas are implemented on the JVM. Skip to the next section if you're just after the working code.

The cache map needs to associate loader lambdas with cache instances. We have a reference to the loader lambda right to hand so let's give it a go. The following code demonstrates what someone who doesn't think too hard about how the JVM works (e.g. me) might expect:

fun main() {
    (0..2).forEach { cache(it) { it.toString() } }
}

fun cache(key: Int, loader: (Int) -> String): String {
    println("cache() called with loader ${loader.hashCode()}")
    return loader(key)
}

// Prints:
// cache() called with loader 1639622804
// cache() called with loader 1639622804
// cache() called with loader 1639622804

This toy example looks promising and it certainly seems like every time we invoke our cache function, the loader arguments are at least referentially equal.

There are unfortunately some problems here. For one, as this comment from Brian Goetz clarifies, this behaviour isn't guaranteed by the JVM spec and can change at any time. The only thing the spec guarantees (for reasons explained in the comment) is that two equal lambda references point to the same instance of the FunctionalInterface-implementing class - at best trivially useful.

The other issue (also touched on in the above comment) is that the loader lambda in the toy example above doesn't capture any variables from the enclosing scope (it is a non-capturing lambda). If you inspect the Kotlin bytecode of a non-capturing lambda you'll notice that every time it's invoked, it's accessed as a static member of the enclosing class and hence we reuse the exact same instance every time.

Unfortunately if we want to reference any variables from the enclosing scope in the loader (which we're going to want to do if we want our loader or do anything useful), our lambda becomes a capturing lambda and instead of being accessed as a static member of the enclosing class, a new instance of the lambda class is created for each invocation. This explains the following result (in which captured is captured from the enclosing scope on each invocation):

fun main() {
    val captured = ""
    (0..2).forEach { cache(it) { captured } }
}

fun cache(key: Int, loader: (Int) -> String): String {
    println("cache() called with loader ${loader.hashCode()}")
    return loader(key)
}

// Prints:
// cache() called with loader 1975358023
// cache() called with loader 2101440631
// cache() called with loader 2109957412

This rules out using the lambda instance as a key to the cache map.

The end result

That was of course just an interesting diversion and even if we can't use a reference to the lambda as a key we can use a reference to the synthetic class that backs the lambda as a key. For our purposes, this is all that is required - We want to associate cache instances with the computation represented by the loader lambda (i.e. the class backing the lambda), not with specific invocations of that computation (i.e. instances of that class).

The signature of the cache map ends up looking something like:

private val cacheMap: MutableMap<KClass<suspend (Any?) -> Any?>, AsyncLoadingCache<*, *>>

Which we can then cast (unsafely) to the required type when we invoke the cache function:

val cacheMap = cacheMap as MutableMap<KClass<out suspend (K) -> V>, AsyncLoadingCache<K, V>>

At last we're ready to put it all together.

private val cacheContext = CoroutineScope(Dispatchers.Default + SupervisorJob())
private val cacheMap: MutableMap<KClass<suspend (Any?) -> Any?>, AsyncLoadingCache<*, *>> = HashMap()

suspend fun <K, V> cached(key: K,
                          cacheConfig: Caffeine<Any, Any>.() -> Caffeine<Any, Any> = {
                              this.maximumSize(10_000)
                                  .expireAfterWrite(3, TimeUnit.HOURS)
                          },
                          loader: suspend (K) -> V): V = withContext(cacheContext.coroutineContext) {

    val cacheMap = cacheMap as MutableMap<KClass<out suspend (K) -> V>, AsyncLoadingCache<K, V>>

    (cacheMap[loader::class] ?: Caffeine.newBuilder()
        .cacheConfig()
        .buildAsync { key: K, _ -> cacheContext.future { loader(key) } }
        .also { cacheMap[loader::class] = it })
        .get(key)
        .await()
}

Here we're creating a context for our loaders to execute in and initialising the cache map. The cached() function takes type params for the key and value, a key, some optional config with default settings and a suspending loader function. In the body of the function we're:

  • Casting the cache map as described above
  • Retrieving a cache from the cache map if one exists
  • Creating a cache and storing it in the cache map via the code in the .also {} block
  • Calling the .get(key) function on the cache to retrieve a future containing the result
  • Transforming the future in a call to the suspending .await() function to get the value.

And that's it. We've now got a fully configurable async cache that integrates nicely with coroutines and offers a concise and readable syntax as a bonus.

...
val aThing = cached(key) {
    anExpensiveCallThatReturnsTheRightThing(key)
}
...

© 2023 Henry Course