Actors benchmark: F# vs Kotlin
It turns out Kotlin has a full fledged implementation of CML / Go / Hopac style concurrency. It has synchronous channels, selective synchronization on send and receive (and it allows to mix both in the same select, too), buffered channels, etc; read this article for details.
Let’s pick a simple benchmark from Hopac repository and translate it into Kotlin.
F# (Hopac)
// Copyright (C) by Housemarque, Inc.
let (^) x = (<|) x
type Msg =
| Add of int64
| GetAndReset of IVar<int64>
type CounterActor =
| CA of Ch<Msg>
let create : Job<CounterActor> = job {
let inCh = Ch ()
let state = ref 0L
do! Job.foreverServer
(inCh >>= function
| Add n ->
state := !state + n
Job.unit ()
| GetAndReset replyVar ->
let was = !state
state := 0L
replyVar *<= was)
return CA inCh
}
let add (CA inCh) (n: int64) = inCh *<- Add n
let getAndReset (CA inCh) =
inCh *<+=>- GetAndReset :> Job<_>
let run numPerThread =
printf "ChMsg: "
let timer = Stopwatch.StartNew ()
ignore ^ run ^ job {
let! actor = create
do! seq {1 .. Environment.ProcessorCount}
|> Seq.Con.iterJob
(fun _ -> Job.forN numPerThread (add actor 100L))
return! getAndReset actor
}
let d = timer.Elapsed
printf "%d * %8d msgs => %8.0f msgs/s\n"
Environment.ProcessorCount numPerThread
(float (Environment.ProcessorCount * numPerThread) / d.TotalSeconds)
for n in [300; 3000; 30000; 300000; 3000000] do
run n
GC.Collect(2)
GC.Collect(2)
GC.Collect(2)
Output:
ChMsg: 8 * 300 msgs => 75259 msgs/s
ChMsg: 8 * 3000 msgs => 4393351 msgs/s
ChMsg: 8 * 30000 msgs => 4591289 msgs/s
ChMsg: 8 * 300000 msgs => 4620537 msgs/s
ChMsg: 8 * 3000000 msgs => 4721725 msgs/s
F# (MailboxProcessor)
This version uses the built-in actor-like primitive named MailboxProcessor
, it’s
combination of a message queue and a async loop that processes the messages in
unblocking manner.
type Msg =
| Add of int64
| GetAndReset of AsyncReplyChannel<int64>
let create() = MailboxProcessor.Start(fun mb ->
async {
let mutable state = 0L
while true do
let! msg = mb.Receive()
match msg with
| Add n -> state <- state + n
| GetAndReset reply ->
let was = state
state <- 0L
reply.Reply was
})
let add (mp: MailboxProcessor<Msg>) (n: int64) = mp.Post (Add n)
let getAndReset (mp: MailboxProcessor<Msg>) = mp.PostAndReply GetAndReset
let run numPerThread =
let timer = Stopwatch.StartNew()
printf "MailboxProcessor: "
let actor = create()
let _ =
[1..Environment.ProcessorCount]
|> List.map (fun _ ->
async { for _ in 1..numPerThread do
add actor 100L })
|> Async.Parallel
|> Async.RunSynchronously
let _ = getAndReset actor
let d = timer.Elapsed
printf "%d * %8d msgs => %8.0f msgs/s\n"
Environment.ProcessorCount numPerThread
(float (Environment.ProcessorCount * numPerThread) / d.TotalSeconds)
for n in [300; 3000; 30000; 300000; 3000000] do
run n
GC.Collect(2)
GC.Collect(2)
GC.Collect(2)
Output:
8 * 300 msgs => 224431 msgs/s
8 * 3000 msgs => 1480677 msgs/s
8 * 30000 msgs => 1138195 msgs/s
8 * 300000 msgs => 1074774 msgs/s
8 * 3000000 msgs => 1159185 msgs/s
Kotlin (plane actor)
sealed class Msg
class Add(val n: Long) : Msg()
class GetAndReset(val reply: SendChannel<Long>) : Msg()
fun create() = actor<Msg>(CommonPool) {
var state = 0L
for (msg in channel) {
when (msg) {
is Add -> state += msg.n
is GetAndReset -> {
val was = state
state = 0L
msg.reply.send(was)
}
}
}
}
suspend fun getAndReset(actor: ActorJob<Msg>): Long {
val reply = Channel<Long>()
actor.send(GetAndReset(reply))
return reply.receive()
}
suspend fun run(numPerThread: Int) {
val processorCount = Runtime.getRuntime().availableProcessors()
val elapsed = measureTimeMillis {
val actor = create()
(1..processorCount).map {
async(CommonPool) {
for (n in 1..numPerThread) {
actor.send(Add(100L))
}
getAndReset(actor)
}
}.forEach { it.await() }
}
print("%d * %8d msgs => %8.0f msgs/s\n"
.format(processorCount, numPerThread, processorCount * numPerThread / (elapsed / 1000.0)))
}
fun main(args: Array<String>) = runBlocking {
for (n in listOf(300, 3_000, 30_000, 300_000, 3_000_000)) {
run(n)
}
}
Output:
8 * 300 msgs => 60000 msgs/s
8 * 3000 msgs => 470588 msgs/s
8 * 30000 msgs => 1012658 msgs/s
8 * 300000 msgs => 1355167 msgs/s
8 * 3000000 msgs => 1316222 msgs/s
This version code uses the actor
function that simplify things a bit. If you are curious
what such an actor would look like being implemented from scratch, this is it:
fun create(): Channel<Msg> {
val ch = Channel<Msg>()
async(CommonPool) {
var state = 0L
for (msg in ch) {
when (msg) {
is Add -> state += msg.n
is GetAndReset -> {
val was = state
state = 0L
msg.reply.send(was)
}
}
}
}
return ch
}
Kotlin (select from channels)
class Actor() {
private var state = 0L
private val addCh = Channel<Long>()
private val getAndResetCh = Channel<SendChannel<Long>>()
// expose `SendChannel`s because client code should not be able to receive our messages.
val add = addCh as SendChannel<Long>
val getAndReset = getAndResetCh as SendChannel<SendChannel<Long>>
init {
launch(CommonPool) {
while (true) {
select<Unit> {
addCh.onReceive {
state += it
}
getAndResetCh.onReceive {
val was = state
state = 0
it.send(was)
}
}
}
}
}
}
suspend fun Actor.getAndReset(): Long =
Channel<Long>().let {
getAndReset.send(it)
it.receive()
}
suspend fun run(numPerThread: Int) {
val processorCount = Runtime.getRuntime().availableProcessors()
val elapsed = measureTimeMillis {
val actor = Actor()
(1..processorCount).map {
launch(CommonPool) {
for (n in 1..numPerThread) {
select<Unit> {
actor.add.onSend(100L) {}
// just to show how timeouts can be added in composable way
onTimeout(100) { println("Adding to the actor timed out!") }
}
}
actor.getAndReset()
}
}.forEach { it.join() }
}
print("%d * %8d msgs => %8.0f msgs/s\n"
.format(processorCount, numPerThread, processorCount * numPerThread / (elapsed / 1000.0)))
}
fun main(args: Array<String>) = runBlocking {
for (n in listOf(300, 3_000, 30_000, 300_000, 3_000_000)) {
run(n)
}
}
Output:
8 * 300 msgs => 26966 msgs/s
8 * 3000 msgs => 156863 msgs/s
8 * 30000 msgs => 342857 msgs/s
8 * 300000 msgs => 776950 msgs/s
8 * 3000000 msgs => 763310 msgs/s
It’s been awhile I used Hopac for the last time and almost forget how hard it is. I find
the Kotlin and MailboxProcessor
versions tremendously easier to read and
it was a pleasure to write.
So, Kotlin’s actor is ~3.6 times slower than Hopac and ~10% faster than MailboxProcessor
.
select
introduces significant overhead, it’s ~40% slower than actor
.
