Skip to content

Commit

Permalink
HelloKotlinWorkflow works
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev committed Jun 20, 2023
1 parent a40e569 commit 4ed8723
Show file tree
Hide file tree
Showing 26 changed files with 280 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.kotlin.interceptors.WorkflowInboundCallsInterceptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand All @@ -39,7 +40,7 @@ internal class DynamicKotlinWorkflowDefinition(
private var workflowInvoker: WorkflowInboundCallsInterceptor? = null

override suspend fun initialize() {
val workflowContext: KotlinWorkflowContext = KotlinWorkflowInternal.rootWorkflowContext
val workflowContext: KotlinWorkflowContext = KotlinWorkflowInternal.getRootWorkflowContext()
workflowInvoker = RootWorkflowInboundCallsInterceptor(workflowContext)
for (workerInterceptor in workerInterceptors) {
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import java.lang.reflect.Type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.activity.ActivityOptions
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand Down Expand Up @@ -101,7 +121,7 @@ class KotlinWorkflow(
return false
}
dispatcher.eventLoop(defaultDeadlockDetectionTimeout)
return dispatcher.isDone() || executionHandler!!.isDone // Do not wait for all other threads.
return /*dispatcher.isDone() ||*/ executionHandler!!.isDone // Do not wait for all other threads.
}

override fun getOutput(): Optional<Payloads> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import com.uber.m3.tally.Scope
Expand Down Expand Up @@ -57,6 +77,8 @@ class KotlinWorkflowContext(
private var activityOptionsMap: Map<String, ActivityOptions>? = null
private var localActivityOptionsMap: Map<String, LocalActivityOptions>? = null

private var replayContext: ReplayWorkflowContext? = null

init {
if (workflowImplementationOptions != null) {
defaultActivityOptions = workflowImplementationOptions!!.defaultActivityOptions
Expand Down Expand Up @@ -88,7 +110,7 @@ class KotlinWorkflowContext(
replayContext = context
}

override fun getReplayContext(): ReplayWorkflowContext {
override fun getReplayContext(): ReplayWorkflowContext? {
return replayContext
}

Expand Down Expand Up @@ -142,7 +164,7 @@ class KotlinWorkflowContext(
override fun <R : Any?> getLastCompletionResult(resultClass: Class<R>, resultType: Type): R? {
return dataConverter.fromPayloads(
0,
Optional.ofNullable(replayContext.lastCompletionResult),
Optional.ofNullable(replayContext!!.lastCompletionResult),
resultClass,
resultType
)
Expand All @@ -157,7 +179,7 @@ class KotlinWorkflowContext(
return HashMap()
}

val headerData: Map<String, Payload> = HashMap(replayContext.header)
val headerData: Map<String, Payload> = HashMap(replayContext!!.header)
val contextData: MutableMap<String, Any> = HashMap()
for (propagator in contextPropagators) {
contextData[propagator.name] = propagator.deserializeContext(headerData)
Expand All @@ -168,13 +190,13 @@ class KotlinWorkflowContext(

override suspend fun <R> executeActivity(input: WorkflowOutboundCallsInterceptor.ActivityInput<R>): WorkflowOutboundCallsInterceptor.ActivityOutput<R?> {
val serializationContext = ActivitySerializationContext(
replayContext.namespace,
replayContext.workflowId,
replayContext.workflowType.name,
replayContext!!.namespace,
replayContext!!.workflowId,
replayContext!!.workflowType.name,
input.activityName,
// input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
// by the Server in this case
if (input.options.taskQueue != null) input.options.taskQueue else replayContext.taskQueue,
if (input.options.taskQueue != null) input.options.taskQueue else replayContext!!.taskQueue,
false
)
val dataConverterWithActivityContext = dataConverter.withContext(serializationContext)
Expand Down Expand Up @@ -205,7 +227,7 @@ class KotlinWorkflowContext(
TODO("Not yet implemented")
}

override fun newRandom(): Random = replayContext.newRandom()
override fun newRandom(): Random = replayContext!!.newRandom()

override suspend fun signalExternalWorkflow(input: WorkflowOutboundCallsInterceptor.SignalExternalInput): WorkflowOutboundCallsInterceptor.SignalExternalOutput {
TODO("Not yet implemented")
Expand Down Expand Up @@ -290,7 +312,7 @@ class KotlinWorkflowContext(
}

val metricScope: Scope
get() = replayContext.metricsScope
get() = replayContext!!.metricsScope

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun executeActivityOnce(
Expand All @@ -303,7 +325,7 @@ class KotlinWorkflowContext(

return suspendCancellableCoroutine { continuation ->
var activityId: String? = null
val activityOutput = replayContext.scheduleActivityTask(
val activityOutput = replayContext!!.scheduleActivityTask(
params
) { output: Optional<Payloads>, failure: Failure? ->
if (failure == null) {
Expand All @@ -329,7 +351,7 @@ class KotlinWorkflowContext(
): ExecuteActivityParameters {
var taskQueue = options.taskQueue
if (taskQueue == null) {
taskQueue = replayContext.taskQueue
taskQueue = replayContext!!.taskQueue
}
val attributes = ScheduleActivityTaskCommandAttributes.newBuilder()
.setActivityType(ActivityType.newBuilder().setName(name))
Expand All @@ -345,7 +367,7 @@ class KotlinWorkflowContext(
)
.setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.heartbeatTimeout))
.setRequestEagerExecution(
!options.isEagerExecutionDisabled && (taskQueue == replayContext.taskQueue)
!options.isEagerExecutionDisabled && (taskQueue == replayContext!!.taskQueue)
)
input.ifPresent { value: Payloads? ->
attributes.input = value
Expand All @@ -366,7 +388,7 @@ class KotlinWorkflowContext(
if (options.versioningIntent != null) {
attributes.useCompatibleVersion = options
.versioningIntent
.determineUseCompatibleFlag(replayContext.taskQueue == options.taskQueue)
.determineUseCompatibleFlag(replayContext!!.taskQueue == options.taskQueue)
}
return ExecuteActivityParameters(attributes, options.cancellationType)
}
Expand Down Expand Up @@ -396,7 +418,7 @@ class KotlinWorkflowContext(
* thread and should be replaced with another specific implementation during initialization stage
* `workflow.initialize()` performed inside the workflow root thread.
*
* @see SyncWorkflow.start
* @see KotlinWorkflow.start
*/
private class InitialWorkflowInboundCallsInterceptor(workflowContext: KotlinWorkflowContext) :
BaseRootKotlinWorkflowInboundCallsInterceptor(workflowContext) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.Payloads
Expand Down Expand Up @@ -106,7 +107,7 @@ internal class KotlinWorkflowExecutionHandler(

private fun throwAndFailWorkflowExecution(exception: Throwable) {
val replayWorkflowContext = context.getReplayContext()
val fullReplayDirectQueryName = replayWorkflowContext.fullReplayDirectQueryName
val fullReplayDirectQueryName = replayWorkflowContext!!.fullReplayDirectQueryName
val info = Workflow.getInfo()
if (fullReplayDirectQueryName != null) {
if (log.isDebugEnabled &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import com.google.common.base.Preconditions
import com.google.common.collect.ImmutableSet
import io.temporal.api.common.v1.Payloads
import io.temporal.api.common.v1.WorkflowExecution
import io.temporal.api.common.v1.WorkflowType
Expand All @@ -32,7 +32,6 @@ import io.temporal.common.metadata.POJOWorkflowImplMetadata
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata
import io.temporal.common.metadata.WorkflowMethodType
import io.temporal.failure.CanceledFailure
import io.temporal.internal.common.env.ReflectionUtils
import io.temporal.internal.replay.ReplayWorkflow
import io.temporal.internal.replay.ReplayWorkflowFactory
import io.temporal.internal.sync.WorkflowInternal
Expand All @@ -54,6 +53,8 @@ import org.slf4j.LoggerFactory
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.*
import kotlin.reflect.full.callSuspend
import kotlin.reflect.jvm.kotlinFunction

class KotlinWorkflowImplementationFactory(
clientOptions: WorkflowClientOptions,
Expand Down Expand Up @@ -281,9 +282,11 @@ class KotlinWorkflowImplementationFactory(
// don't pass it down to other classes, it's a "cached" instance for internal usage only
private val dataConverterWithWorkflowContext: DataConverter
) : KotlinWorkflowDefinition {

private var workflowInvoker: WorkflowInboundCallsInterceptor? = null

override suspend fun initialize() {
val workflowContext = KotlinWorkflowInternal.rootWorkflowContext
val workflowContext = KotlinWorkflowInternal.getRootWorkflowContext()
workflowInvoker = RootWorkflowInboundCallsInterceptor(workflowContext)
for (workerInterceptor in workerInterceptors) {
workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker!!)
Expand All @@ -296,8 +299,9 @@ class KotlinWorkflowImplementationFactory(
override suspend fun execute(header: Header?, input: Payloads?): Payloads? {
val args = dataConverterWithWorkflowContext.fromPayloads(
Optional.ofNullable(input),
workflowMethod.parameterTypes,
workflowMethod.genericParameterTypes
// TODO(maxim): Validate that the last element is coroutine continuation
workflowMethod.parameterTypes.dropLast(1).toTypedArray(),
workflowMethod.genericParameterTypes.dropLast(1).toTypedArray()
)
Preconditions.checkNotNull(workflowInvoker, "initialize not called")
val result = workflowInvoker!!.execute(WorkflowInboundCallsInterceptor.WorkflowInput(header, args))
Expand All @@ -322,7 +326,8 @@ class KotlinWorkflowImplementationFactory(

override suspend fun execute(input: WorkflowInboundCallsInterceptor.WorkflowInput): WorkflowInboundCallsInterceptor.WorkflowOutput {
return try {
val result = workflowMethod.invoke(workflow, *input.arguments)
val kMethod = workflowMethod.kotlinFunction
val result = kMethod!!.callSuspend(workflow, *input.arguments)
WorkflowInboundCallsInterceptor.WorkflowOutput(result)
} catch (e: IllegalAccessException) {
throw CheckedExceptionWrapper.wrap(e)
Expand Down Expand Up @@ -381,23 +386,25 @@ class KotlinWorkflowImplementationFactory(

companion object {
private val log = LoggerFactory.getLogger(KotlinWorkflowImplementationFactory::class.java)
val WORKFLOW_HANDLER_STACKTRACE_CUTOFF = ImmutableSet.builder<String>() // POJO
.add(
ReflectionUtils.getMethodNameForStackTraceCutoff(
KotlinWorkflowImplementation::class.java,
"execute",
Header::class.java,
Optional::class.java
)
) // Dynamic
.add(
ReflectionUtils.getMethodNameForStackTraceCutoff(
DynamicKotlinWorkflowDefinition::class.java,
"execute",
Header::class.java,
Optional::class.java
)
)
.build()

// TODO(maxim): See if this is needed for Kotlin
val WORKFLOW_HANDLER_STACKTRACE_CUTOFF = 0 // ImmutableSet.builder<String>() // POJO
// .add(
// ReflectionUtils.getMethodNameForStackTraceCutoff(
// KotlinWorkflowImplementation::class.java,
// "execute",
// Header::class.java,
// Payloads::class.java
// )
// ) // Dynamic
// .add(
// ReflectionUtils.getMethodNameForStackTraceCutoff(
// DynamicKotlinWorkflowDefinition::class.java,
// "execute",
// Header::class.java,
// Payloads::class.java
// )
// )
// .build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.async

import io.temporal.api.common.v1.SearchAttributes
Expand Down

0 comments on commit 4ed8723

Please sign in to comment.