Scheduler in F# and Kotlin

Let’s explore Kotlin’s coroutines a bit deeper. How is the cancellation done?

Pull another victim from FsSnip site - Agent Based Scheduler, realize that it’s super overcomplicated: we don’t need an actor to start an async computation, so we don’t need messages to pack arguments to send them to the actor. What’s left? Just a function that spins up an async:

let schedule<'a> (msg: 'a) (initialDelay: TimeSpan option) (delayBetween: TimeSpan option) (ct: CancellationToken) (receiver: 'a -> unit) =
    let computation = async {
        match initialDelay with
        | Some delay -> do! Async.Sleep (int delay.TotalMilliseconds)
        | _ -> ()

        match delayBetween with
        | None -> if not ct.IsCancellationRequested then receiver msg
        | Some delay ->
            while not ct.IsCancellationRequested do
                receiver msg
                do! Async.Sleep (int delay.TotalMilliseconds)

    }
    Async.Start(computation, ct)

[<EntryPoint>]
let main _ = Async.RunSynchronously <| async {
    use cts = new CancellationTokenSource()
    schedule 25 None (Some (TimeSpan.FromSeconds 1.)) cts.Token <| fun msg ->
        printfn "Scheduler got %A" msg

    do! Async.Sleep 5000
    cts.Cancel()
    printfn "cancelled"
    do! Async.Sleep 5000
    return 0
}

output

Scheduler got 25
Scheduler got 25
Scheduler got 25
Scheduler got 25
Scheduler got 25
cancelled

Quite a clean, easily readable code… yes?

Let’s port it into Kotlin.

suspend fun <T> schedule(ctx: CoroutineContext = CommonPool, msg: T, initialDelay: Duration? = null,
                         delayBetween: Duration? = null, receiver: (T) -> Unit): Job =
    launch(ctx) {
        initialDelay?.let { delay(it.toMillis()) }

        if (delayBetween == null) {
            if (isActive) receiver(msg)
        } else {
            while (isActive) {
                receiver(msg)
                delay(delayBetween.toMillis())
            }
        }
    }

fun main(args: Array<String>) = runBlocking {
    val job = schedule(CommonPool, 25, delayBetween = Duration.ofSeconds(1)) {
        println("Scheduler got $it")
    }

    delay(5000)
    job.cancel()
    println("cancelled")
    delay(5000)
}

output

Scheduler got 25
Scheduler got 25
Scheduler got 25
Scheduler got 25
Scheduler got 25
cancelled
val job = Job()

schedule(CommonPool + job, 25, delayBetween = Duration.ofSeconds(1)) {
    println("Scheduler1 got $it")
}

schedule(CommonPool + job, 27, delayBetween = Duration.ofSeconds(2)) {
    println("Scheduler2 got $it")
}

delay(5000)
job.cancel()
println("cancelled")
delay(5000)

output

Scheduler1 got 25
Scheduler2 got 27
Scheduler1 got 25
Scheduler2 got 27
Scheduler1 got 25
Scheduler1 got 25
Scheduler2 got 27
Scheduler1 got 25
cancelled