Writing an object pool with synchronous channels: F# vs Kotlin
F#
This one I wrote long time ago in Hopac:
type private PoolEntry<'a> =
{ Value: 'a
mutable LastUsed: DateTime }
static member Create(value: 'a) = { Value = value; LastUsed = DateTime.UtcNow }
interface IDisposable with
member x.Dispose() =
// Mute exceptions those may be raised in instance's Dispose method to prevent the pool
// to stop looping.
try
match box x.Value with
| :? IDisposable as d -> d.Dispose()
| _ -> ()
with _ -> ()
/// Bounded pool of disposable objects. If number of given objects is equal to capacity then client will be blocked as it tries to get an instance.
/// If an object in pool is not used more then inactiveTimeBeforeDispose period of time, it's disposed and removed from the pool.
/// When the pool is disposing itself, it disposes all objects it caches first.
type ObjectPool<'a>(createNew: unit -> 'a, ?capacity: uint32, ?inactiveTimeBeforeDispose: TimeSpan) =
let capacity = defaultArg capacity 50u
let inactiveTimeBeforeDispose = defaultArg inactiveTimeBeforeDispose (TimeSpan.FromMinutes 1.)
let reqCh = Ch<Promise<unit> * Ch<Choice<PoolEntry<'a>, exn>>>()
let releaseCh = Ch<'a PoolEntry>()
let maybeExpiredCh = Ch<'a PoolEntry>()
let doDispose = IVar()
let hasDisposed = IVar()
let rec loop (available: 'a PoolEntry list, given: uint32) = Job.delay <| fun _ ->
// an instance returns to pool
let releaseAlt() =
releaseCh ^=> fun instance ->
instance.LastUsed <- DateTime.UtcNow
Job.start (timeOut inactiveTimeBeforeDispose >>=.
(maybeExpiredCh *<+ instance)) >>=.
loop (instance :: available, given - 1u)
// request for an instance
let reqAlt() =
reqCh ^=> fun (nack, replyCh) ->
let ok available instance =
replyCh *<- Ok instance ^=>. loop (available, given + 1u) <|>
nack ^=>. loop (instance :: available, given)
match available with
| instance :: available -> ok available instance
| [] -> try createNew () |> PoolEntry.Create |> ok available
with e -> (replyCh *<- Fail e <|> nack) ^=>. loop (available, given)
// an instance was inactive for too long
let expiredAlt() =
maybeExpiredCh ^=> fun instance ->
if DateTime.UtcNow - instance.LastUsed > inactiveTimeBeforeDispose
&& List.exists (fun x -> obj.ReferenceEquals(x, instance)) available then
dispose instance
loop (available |> List.filter (fun x -> not <| obj.ReferenceEquals(x, instance)), given)
else loop (available, given)
// the entire pool is disposing
let disposeAlt() =
doDispose ^=> fun _ ->
// dispose all instances that are in pool
available |> List.iter dispose
// wait until all given instances are returns to pool and disposing them on the way
Job.forN (int given) (releaseCh >>- dispose) >>=. hasDisposed *<= ()
if given < capacity then
// if number of given objects has not reached the capacity, synchronize on request channel as well
releaseAlt() <|> expiredAlt() <|> disposeAlt() <|> reqAlt()
else
releaseAlt() <|> expiredAlt() <|> disposeAlt()
do start (loop ([], 0u))
let get() = reqCh *<+->- fun replyCh nack -> (nack, replyCh)
/// Applies a function, that returns a Job, on an instance from pool. Returns `Alt` to consume
/// the function result.
member __.WithInstanceJobChoice (f: 'a -> #Job<Choice<'r, exn>>) : Alt<Choice<'r, exn>> =
get() ^=> function
| Ok entry ->
Job.tryFinallyJobDelay
<| fun _ -> f entry.Value
<| releaseCh *<- entry
| Fail e -> Job.result (Fail e)
interface IAsyncDisposable with
member __.DisposeAsync() = IVar.tryFill doDispose () >>=. hasDisposed
interface IDisposable with
/// Runs disposing asynchronously. Does not wait until the disposing finishes.
member x.Dispose() = (x :> IAsyncDisposable).DisposeAsync() |> start
Kotlin
As always with me and Kotlin at this time, I’m not sure the following code is totally idiomatic:
class ObjectPool<out T>(
createNew: () -> T,
capacity: Int = 50,
inactiveTimeBeforeDisposeMillis: Long = 10_000
) : AutoCloseable {
private class PoolEntry<T>(val value: T, var lastUsed: ZonedDateTime = ZonedDateTime.now()) : AutoCloseable {
override fun close() {
if (value is AutoCloseable)
try {
value.close()
} catch (_: Throwable) {
}
}
}
private val reqCh = Channel<Channel<PoolEntry<T>>>()
private val releaseCh = Channel<PoolEntry<T>>()
private val maybeExpiredCh = Channel<PoolEntry<T>>()
private val doCloseCh = Channel<Unit>()
private val hasClosed = Channel<Unit>()
private val available = Stack<PoolEntry<T>>()
private var given = 0
private fun <T> Stack<T>.tryPop() = if (empty()) null else pop()
private fun <T> close(x: T) = (x as? AutoCloseable)?.close()
init {
launch(CommonPool) {
whileSelect {
// An instance returns to pool.
releaseCh.onReceive { instance ->
instance.lastUsed = ZonedDateTime.now()
launch(CommonPool) {
delay(inactiveTimeBeforeDisposeMillis)
maybeExpiredCh.send(instance)
}
available.add(instance)
given--
true
}
// If number of given objects has not reached the capacity, synchronize on request channel as well.
if (given < capacity) {
// request for an instance
reqCh.onReceive { replyCh ->
replyCh.send(available.tryPop() ?: PoolEntry(createNew()))
given++
true
}
}
// An instance was inactive for too long time, dispose and remove it.
maybeExpiredCh.onReceive { instance ->
if (Duration.between(instance.lastUsed, ZonedDateTime.now()).toMillis() > inactiveTimeBeforeDisposeMillis
&& available.any { it === instance }) {
println("${instance.value} expired")
close(instance)
available.removeIf { it === instance }
}
true
}
// The entire pool is closing.
doCloseCh.onReceive {
// dispose all instances that are in pool
available.forEach { close(it) }
// wait until all given instances are returned to the pool and disposing them on the way
(1..given).forEach { close(releaseCh.receive()) }
hasClosed.send(Unit)
false
}
}
}
}
suspend fun withInstance(f: suspend (T) -> Unit) {
val replyCh = Channel<PoolEntry<T>>()
reqCh.send(replyCh)
val entry = replyCh.receive()
try {
f(entry.value)
} finally {
releaseCh.send(entry)
}
}
override fun close() = runBlocking {
doCloseCh.send(Unit)
val timeoutSeconds = 10L
select<Unit> {
hasClosed.onReceive {}
onTimeout(timeoutSeconds, TimeUnit.SECONDS) {
throw TimeoutException("${javaClass.name} has not closed in $timeoutSeconds s")
}
}
}
}
Usage:
fun main(args: Array<String>) = runBlocking {
val startTime = ZonedDateTime.now()
fun <T> ObjectPool<T>.useInstance(name: String) {
fun log(msg: String) =
println("%4d | %s | %s".format(Duration.between(startTime, ZonedDateTime.now()).toMillis(), name, msg))
launch(CommonPool) {
log("get...")
withInstance {
log("got $it!")
delay(1000)
}
log("released.")
}
}
var n = 0
ObjectPool<Int>({ n++ }, 2, 1000).use {
it.useInstance("1")
it.useInstance("2")
it.useInstance("3")
it.useInstance("4")
delay(5000)
}
Unit
}
Output
13 | 3 | get...
13 | 4 | get...
13 | 2 | get...
13 | 1 | get...
28 | 1 | got 0!
32 | 3 | got 1!
1042 | 1 | released.
1043 | 3 | released.
1043 | 2 | got 1!
1043 | 4 | got 0!
2043 | 2 | released.
2044 | 4 | released.
1 expired
0 expired
Kotlin’s version does not use the Either monad to track errors, it just uses exceptions, which I think is the proper way to do in any language which has built-in support for them. It doesn’t use negative acknowledgements either because they are not needed.
The structure of both programs are similar, but Kotlin’s one reads much easier thanks to deep coroutine integration and avoiding custom operators.