Skip to content

Commit

Permalink
Added support for delay
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Jun 20, 2023
1 parent 4ed8723 commit a61af8b
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class KotlinWorkflow(
contextPropagators
)

private val dispatcher = TemporalCoroutineDispatcher()
private val dispatcher = TemporalCoroutineDispatcher(workflowContext)
private val coroutineDispatcher = TemporalCallbackCoroutineDispatcher(dispatcher)
private val scope = TemporalScope(workflowContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,6 @@ class KotlinWorkflowContext(
TODO("Not yet implemented")
}

override suspend fun sleep(duration: Duration) {
TODO("Not yet implemented")
}

override suspend fun await(timeout: Duration, reason: String?, unblockCondition: Supplier<Boolean?>): Boolean {
TODO("Not yet implemented")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,18 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Delay
import kotlinx.coroutines.InternalCoroutinesApi
import java.lang.RuntimeException
import java.time.Duration
import java.util.*
import java.util.concurrent.DelayQueue
import java.util.concurrent.Delayed
import java.util.concurrent.TimeUnit
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext

@Suppress("UNUSED_PARAMETER")
@OptIn(InternalCoroutinesApi::class)
class TemporalCoroutineDispatcher : CoroutineDispatcher(), Delay {
class TemporalCoroutineDispatcher(val workflowContext: KotlinWorkflowContext) : CoroutineDispatcher(), Delay {

private val queue: java.util.Queue<Runnable> = LinkedList()
private val callbackQueue: Queue<Runnable> = LinkedList()
private val delayQueue: DelayQueue<DelayedContinuation> = DelayQueue()

override fun dispatch(context: CoroutineContext, block: Runnable) {
queue.add(block)
Expand All @@ -49,13 +47,7 @@ class TemporalCoroutineDispatcher : CoroutineDispatcher(), Delay {
}

// TODO: deadlock detector
fun eventLoop(defaultDeadlockDetectionTimeout: Long): Boolean {
// println("eventLoop begin")
if (isDone()) {
println("eventLoop completed")
return false
}

fun eventLoop(defaultDeadlockDetectionTimeout: Long) {
while (callbackQueue.isNotEmpty()) {
val block = callbackQueue.poll()
block.run()
Expand All @@ -65,39 +57,16 @@ class TemporalCoroutineDispatcher : CoroutineDispatcher(), Delay {
val block = queue.poll()
block.run()
}

while (true) {
// println("delayedContinuation while begin count=" + delayQueue.size)

val delayedContinuation = delayQueue.poll() ?: break
println("delayedContinuation returned")
with(delayedContinuation.continuation) { resumeUndispatched(Unit) }
}

return true
}

fun isDone() = queue.isEmpty() && callbackQueue.isEmpty() && delayQueue.isEmpty()

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
println("scheduleResumeAfterDelay delay=$timeMillis")
delayQueue.add(DelayedContinuation(timeMillis, continuation))
}

private class DelayedContinuation(
private val delayTime: Long,
val continuation: CancellableContinuation<Unit>
) : Delayed {
private val startTime = System.currentTimeMillis() + delayTime

override fun compareTo(other: Delayed): Int {
return (getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)).toInt()
}

override fun getDelay(unit: TimeUnit): Long {
val diff = startTime - System.currentTimeMillis()
return unit.convert(diff, TimeUnit.MILLISECONDS)
}
val cancellationHandler =
workflowContext.replayContext!!.newTimer(Duration.ofMillis(timeMillis)) { cancellationRequest ->
cancellationRequest ?: callbackQueue.add {
with(continuation) { resumeUndispatched(Unit) }
}
}
continuation.invokeOnCancellation { cause -> cancellationHandler.apply(cause as RuntimeException?) }
}
}

Expand All @@ -118,11 +87,10 @@ class TemporalCallbackCoroutineDispatcher(val dispatcher: TemporalCoroutineDispa
}

internal class TemporalScope(private val workflowContext: KotlinWorkflowContext) : CoroutineScope {
// TODO: Add argument to the Temporal context.
override val coroutineContext: CoroutineContext = TemporalCoroutineContext(workflowContext)

// CoroutineScope is used intentionally for user-friendly representation
override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
override fun toString(): String = "TemporalScope(coroutineContext=$coroutineContext)"
}

class TemporalCoroutineContext(val workflowContext: KotlinWorkflowContext) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ interface WorkflowOutboundCallsInterceptor {
fun cancelWorkflow(input: CancelWorkflowInput): CancelWorkflowOutput

// TODO: Consider removing sleep and keep only built in delay
suspend fun sleep(duration: Duration)
suspend fun await(timeout: Duration, reason: String?, unblockCondition: Supplier<Boolean?>): Boolean
suspend fun await(reason: String?, unblockCondition: Supplier<Boolean?>)
fun <R> sideEffect(resultClass: Class<R>, resultType: Type, func: Func<R?>): R?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ class WorkflowOutboundCallsInterceptorBase(private val next: WorkflowOutboundCal
return next.cancelWorkflow(input)
}

override suspend fun sleep(duration: Duration) {
return next.sleep(duration)
}

override suspend fun await(timeout: Duration, reason: String?, unblockCondition: Supplier<Boolean?>): Boolean {
return next.await(timeout, reason, unblockCondition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.temporal.serviceclient.WorkflowServiceStubs
import io.temporal.worker.KotlinWorkerFactory
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import org.slf4j.LoggerFactory
import java.time.Duration

Expand Down Expand Up @@ -155,7 +156,10 @@ object HelloKotlinWorkflow {
)

val result1 = async { activities.execute("greet", String::class.java, "Hello", name!!) }
val result2 = async { activities.execute("greet", String::class.java, "Bye", name!!) }
val result2 = async {
delay(1000)
activities.execute("greet", String::class.java, "Bye", name!!)
}
return@coroutineScope result1.await()!! + "\n" + result2.await()!!
}
}
Expand Down

0 comments on commit a61af8b

Please sign in to comment.