Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Class/Context factory support for ScriptEngine #734

Open
vepo opened this issue Jun 2, 2023 · 5 comments
Open

Class/Context factory support for ScriptEngine #734

vepo opened this issue Jun 2, 2023 · 5 comments
Assignees
Labels
question Further information is requested

Comments

@vepo
Copy link

vepo commented Jun 2, 2023

Description

I'm replacing Nashorn with graaljs in a Kafka Stream framework as a hookpoint engine. It's working fine for some of our extension points, but we have an extension point to configure some Kafka Stream steps.

This is the code we are using to test our implementation:

function showEngine() {
    print("====================================")
    if (typeof Graal != 'undefined') {
        print("[ENGINE] Using GraalVM.js")
        print(Graal.versionJS);
        print(Graal.versionGraalVM);
        print(Graal.isGraalRuntime());
    } else {
        print("[ENGINE] Using Nashorn")
    }
}

function onCustom(_stream, _properties) {
    return _stream.peek(function (_key, _value) {
        showEngine();
        print("====   PEEK BEFORE    ====")
        print("[PEEK BEFORE ] KEY   = " + _key)
        print("[PEEK BEFORE ] VALUE = " + _value)
    })
        .filter(function (_key, _value) {
            showEngine();
            print("====   FILTER          ====")
            print("[FILTER      ] KEY   = " + _key)
            print("[FILTER      ] VALUE = " + _value)
            print("[FILTER      ] FILTER= " + (_value.get('type').toString() == 'A'))
            return _value.get('type').toString() == 'A';
        })
        .peek(function (_key, _value) {
            showEngine();
            print("====    PEEK AFTER    ====")
            print("[PEEK AFTER  ] KEY   = " + _key)
            print("[PEEK AFTER  ] VALUE = " + _value)
        });
}

Our issue happens because after executing the execution Kafka Stream creates some threads and tries to call the created objects inside the javascript code. Then we have the exception bellow because more than one thread is accessing the same context simultaneously.

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_2, processor=KSTREAM-SOURCE-0000000000, topic=streams-input, partition=2, offset=24, stacktrace=java.lang.IllegalStateException: Multi threaded access requested by thread         StreamsThread threadId: ngm_streamer_APP-69bb8cc1-3444-4b6f-ba2f-2fdb8666a8b2-StreamThread-3
TaskManager
        MetadataState:
        Tasks:
                0_2 RUNNING StreamTask(active) but is not allowed for language(s) js.
        at com.oracle.truffle.polyglot.PolyglotEngineException.illegalState(PolyglotEngineException.java:129)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.throwDeniedThreadAccess(PolyglotContextImpl.java:941)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.checkAllThreadAccesses(PolyglotContextImpl.java:800)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.enterThreadChanged(PolyglotContextImpl.java:630)
        at com.oracle.truffle.polyglot.PolyglotEngineImpl.enterCached(PolyglotEngineImpl.java:1885)
        at com.oracle.truffle.polyglot.HostToGuestRootNode.execute(HostToGuestRootNode.java:112)
        at com.oracle.truffle.api.impl.DefaultCallTarget.callDirectOrIndirect(DefaultCallTarget.java:85)
        at com.oracle.truffle.api.impl.DefaultCallTarget.call(DefaultCallTarget.java:102)
        at com.oracle.truffle.polyglot.PolyglotFunctionProxyHandler.invoke(PolyglotFunctionProxyHandler.java:154)
        at com.sun.proxy.$Proxy60.apply(Unknown Source)
        at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: Attached Guest Language Frames (1)

        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
        at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: java.lang.IllegalStateException: Multi threaded access requested by thread   StreamsThread threadId: ngm_streamer_APP-69bb8cc1-3444-4b6f-ba2f-2fdb8666a8b2-StreamThread-3
TaskManager
        MetadataState:
        Tasks:
                0_2 RUNNING StreamTask(active) but is not allowed for language(s) js.
        at com.oracle.truffle.polyglot.PolyglotEngineException.illegalState(PolyglotEngineException.java:129)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.throwDeniedThreadAccess(PolyglotContextImpl.java:941)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.checkAllThreadAccesses(PolyglotContextImpl.java:800)
        at com.oracle.truffle.polyglot.PolyglotContextImpl.enterThreadChanged(PolyglotContextImpl.java:630)
        at com.oracle.truffle.polyglot.PolyglotEngineImpl.enterCached(PolyglotEngineImpl.java:1885)
        at com.oracle.truffle.polyglot.HostToGuestRootNode.execute(HostToGuestRootNode.java:112)
        at com.oracle.truffle.api.impl.DefaultCallTarget.callDirectOrIndirect(DefaultCallTarget.java:85)
        at com.oracle.truffle.api.impl.DefaultCallTarget.call(DefaultCallTarget.java:102)
        at com.oracle.truffle.polyglot.PolyglotFunctionProxyHandler.invoke(PolyglotFunctionProxyHandler.java:154)
        at com.sun.proxy.$Proxy60.apply(Unknown Source)
        at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
        ... 6 common frames omitted
Caused by: com.oracle.truffle.api.TruffleStackTrace$LazyStackTrace: null

This is the way we instantiate our script engine:

ScriptEngine engine = GraalJSScriptEngine.create(null, Context.newBuilder("js")
                                                              .allowHostAccess(HostAccess.ALL)
                                                              .allowHostClassLookup(s -> true)
                                                              .allowExperimentalOptions(true)
                                                              .out(System.out)
                                                              .err(System.err)
                                                              .option("js.nashorn-compat", "true"));

Is there a way to provide Class (or Context) factory to avoid using the same context in different threads?

Environment

GraalVM JS version: 21.3.6
Java version: OpenJDK 8

@oubidar-Abderrahim oubidar-Abderrahim self-assigned this Jun 9, 2023
@oubidar-Abderrahim
Copy link
Member

Hi, Thank you for reporting this. Please take a look at this: Multithreading
Let us know if that answers your question

@oubidar-Abderrahim oubidar-Abderrahim added the question Further information is requested label Jun 9, 2023
@vepo
Copy link
Author

vepo commented Jun 11, 2023

Thanks @oubidar-Abderrahim, but this wiki page does not answer my question. My issue is that we create some classes in our Javascript code that are called by another framework (Kafka Streams). We could not find a way to synchronize these threads or create on context per thread.

@oubidar-Abderrahim
Copy link
Member

Hi @iamstolis can you take a look into this please?

@iamstolis
Copy link
Member

Is there a way to provide Class (or Context) factory to avoid using the same context in different threads?

No. You should not use the same (graal-js) ScriptEngine in multiple threads without a proper synchronization. I suggest you to either use one ScriptEngine per thread or synchronize the access to one global ScriptEngine.

We could not find a way to synchronize these threads ...

If I understand correctly then your problem is that some framework is using ScriptEngine (that you provide) from multiple threads. You may try to provide your own implementation of ScriptEngine that delegates to graal-js ScriptEngine. The methods of your wrapper could either synchronize the access or delegate to per-thread graal-js ScriptEngines (that your wrapper ScriptEngine will create/manage).

@vepo
Copy link
Author

vepo commented Jul 12, 2023

@iamstolis, I'm not sure if you understand correctly.

your problem is that some framework is using ScriptEngine (that you provide) from multiple threads.

No. We have a framework using ScriptEngine that does not create the threads, it provides some classes that will be used by other threads. The threads are created by another framework, in our case Kafka Streams.

In my sample, I have the onCustom function that receives a stream and creates some classes on this stream. The function provided on filter will be used in many classes. I can configure my Kafka Stream to use just one thread, but this is not a good scenario for scaling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants