Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS Sending/Receiving Message in mutiny reactive #40587

Closed
prakashelango opened this issue May 13, 2024 · 11 comments
Closed

SQS Sending/Receiving Message in mutiny reactive #40587

prakashelango opened this issue May 13, 2024 · 11 comments

Comments

@prakashelango
Copy link

Description

reactive messaging SQS

as per documentation I have used below two dependencies

implementation 'io.quarkiverse.amazonservices:quarkus-amazon-sqs'
implementation 'io.smallrye.reactive:smallrye-reactive-messaging-aws-sqs:4.21.0'

which fails with "Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.ClassVisitor" ended up adding

    implementation 'org.ow2.asm:asm:9.7'
    implementation 'io.quarkus:quarkus-smallrye-reactive-messaging'

can we have an example with SQS sending and receiving messages.

    @Outgoing("sqs-channel")
    public Multi<Message<String>> generate(final List<String> itemsTOSqs) {
return Multi.createFrom().iterable(itemsTOSqs).map(udmUsageData -> Message.of(itemTOSqs, Metadata.of(SqsOutboundMetadata.builder()
                .deduplicationId("id001")
                .groupId("groupId")
                .build())));
}

using application.yaml

mp:
  messaging:
    outgoing:
      sqs-channel:
        connector: smallrye-sqs
        queue-url: ${SQS_QUEUE_URL:https://sqs.us-west-2.amazonaws.com/[accountid]/sqs_queue.fifo}

which doesn't log any error and I don't see messages in sqs queue. Can we have a clear dependencies list and enhance documentation which would really help.

Implementation ideas

No response

@quarkus-bot
Copy link

quarkus-bot bot commented May 13, 2024

/cc @cescoffier (mutiny), @jponge (mutiny)

@cescoffier
Copy link
Member

I don't believe the SQS connector is supported in Quarkus. For sure adding the ASM dependency is not great (lots of clashes). I think @ozangunalp had an example somewhere.

@ozangunalp
Copy link
Contributor

Normally you don't need the asm dependency, there must be some incompatible versioning.
With Quarkus 3.10.0 and quarkus-amazon-services extension 2.13.1 it is straightforward to use the sqs-connector :

implementation 'io.quarkus:quarkus-messaging'
implementation 'io.quarkiverse.amazonservices:quarkus-amazon-sqs'
implementation 'io.smallrye.reactive:smallrye-reactive-messaging-aws-sqs'

The quarkus-amazon-sqs extension does setup dev services so it is quite easy to start with.

@prakashelango
Copy link
Author


    @Outgoing("sqs-channel")
     @ConsumeEvent("EVENTBUS") // using event bus to process and send to sqs
    public Multi<Message<String>> generate(final List<String> itemsTOSqs) {
return Multi.createFrom().iterable(itemsTOSqs).map(udmUsageData -> Message.of(itemTOSqs, Metadata.of(SqsOutboundMetadata.builder()
                .deduplicationId("id001")
                .groupId("groupId")
                .build())));
}


when used consume event and passed an param as part of function it fails with Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.ClassVisitor

do you have any suggestions on triggering to sqs via event bus or alternate approaches to receive param

@gsmet
Copy link
Member

gsmet commented May 13, 2024

When providing a stack trace, please provide the full stack trace, not just the message. Otherwise it's impossible for us to know what triggered it.

Also your dependency tree might be of interest.

@prakashelango
Copy link
Author

prakashelango commented May 13, 2024

apologies @gsmet here we have the trace.

Caused by: java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:118)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at io.quarkus.launcher.QuarkusLauncher.launch(QuarkusLauncher.java:56)
	... 5 more
Caused by: java.lang.NoClassDefFoundError: org/objectweb/asm/ClassVisitor
	at io.smallrye.config.ConfigMappingInterface.getClassBytes(ConfigMappingInterface.java:149)
	at io.smallrye.config.ConfigMappingLoader.loadClass(ConfigMappingLoader.java:123)
	at io.smallrye.config.ConfigMappingLoader.getImplementationClass(ConfigMappingLoader.java:103)
	at io.smallrye.config.ConfigMappingLoader$1.computeValue(ConfigMappingLoader.java:23)
	at io.smallrye.config.ConfigMappingLoader$1.computeValue(ConfigMappingLoader.java:20)
	at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:229)
	at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:211)
	at java.base/java.lang.ClassValue.get(ClassValue.java:117)
	at io.smallrye.config.ConfigMappingLoader.configMappingObject(ConfigMappingLoader.java:64)
	at io.smallrye.config.ConfigMappingContext.constructGroup(ConfigMappingContext.java:83)
	at io.smallrye.config.ConfigMappingContext.constructRoot(ConfigMappingContext.java:78)
	at io.smallrye.config.ConfigMappingContext$1.apply(ConfigMappingContext.java:72)
	at io.smallrye.config.ConfigMappingContext$1.apply(ConfigMappingContext.java:67)
	at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1228)
	at io.smallrye.config.ConfigMappingContext.registerRoot(ConfigMappingContext.java:67)
	at io.smallrye.config.ConfigMappingContext.<init>(ConfigMappingContext.java:56)
	at io.smallrye.config.ConfigMappingProvider$1.get(ConfigMappingProvider.java:65)
	at io.smallrye.config.ConfigMappingProvider$1.get(ConfigMappingProvider.java:62)
	at io.smallrye.config.SecretKeys.doUnlocked(SecretKeys.java:28)
	at io.smallrye.config.ConfigMappingProvider.mapConfiguration(ConfigMappingProvider.java:62)
	at io.smallrye.config.SmallRyeConfig.<init>(SmallRyeConfig.java:84)
	at io.smallrye.config.SmallRyeConfigBuilder.build(SmallRyeConfigBuilder.java:714)
	at io.quarkus.vertx.http.runtime.VertxHttpRecorder.startServerAfterFailedStart(VertxHttpRecorder.java:244)
	at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup.handleFailedInitialStart(VertxHttpHotReplacementSetup.java:73)
	at io.quarkus.deployment.dev.RuntimeUpdatesProcessor.startupFailed(RuntimeUpdatesProcessor.java:1226)
	at io.quarkus.deployment.dev.IsolatedDevModeMain.firstStart(IsolatedDevModeMain.java:145)
	at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:433)
	at io.quarkus.deployment.dev.IDEDevModeMain.accept(IDEDevModeMain.java:71)
	at io.quarkus.deployment.dev.IDEDevModeMain.accept(IDEDevModeMain.java:28)
	at io.quarkus.bootstrap.app.CuratedApplication.runInCl(CuratedApplication.java:138)
	at io.quarkus.bootstrap.app.CuratedApplication.runInAugmentClassLoader(CuratedApplication.java:93)
	at io.quarkus.bootstrap.IDELauncherImpl.launch(IDELauncherImpl.java:93)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	... 7 more
Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.ClassVisitor
	at io.quarkus.launcher.RuntimeLaunchClassLoader.findClass(RuntimeLaunchClassLoader.java:25)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:518)
	at io.quarkus.bootstrap.classloading.QuarkusClassLoader.loadClass(QuarkusClassLoader.java:468)
	... 40 more

dependencies


    implementation enforcedPlatform("${quarkusPlatformGroupId}:${quarkusPlatformArtifactId}:${quarkusPlatformVersion}")
    implementation enforcedPlatform("${quarkusPlatformGroupId}:quarkus-amazon-services-bom:${quarkusPlatformVersion}")
    //Quarkus
    implementation 'io.quarkus:quarkus-config-yaml'
    implementation 'io.quarkus:quarkus-smallrye-health'
    implementation 'io.quarkus:quarkus-arc'
    implementation 'io.quarkus:quarkus-resteasy-jackson'
    implementation 'io.quarkus:quarkus-vertx'

    implementation 'io.quarkus:quarkus-messaging'
    implementation 'io.quarkiverse.amazonservices:quarkus-amazon-sqs'
    implementation 'io.smallrye.reactive:smallrye-reactive-messaging-aws-sqs'

    testImplementation 'io.quarkus:quarkus-junit5'
    testImplementation 'io.quarkus:quarkus-junit5-mockito'
    testImplementation 'io.quarkus:quarkus-jacoco'
    testImplementation 'org.projectlombok:lombok:1.18.30'
    testAnnotationProcessor 'org.projectlombok:lombok:1.18.30'

    // Aws
    implementation 'software.amazon.awssdk:url-connection-client'
    implementation 'io.quarkiverse.amazonservices:quarkus-amazon-dynamodb-enhanced'
    implementation "software.amazon.awssdk:dynamodb"
    implementation 'io.quarkiverse.amazonservices:quarkus-amazon-kinesis'
    implementation 'com.amazonaws:amazon-kinesis-producer:0.15.10'
    implementation "org.json:json:20231013"
    implementation (group: "software.amazon.kinesis", name: "amazon-kinesis-client", version: "2.5.4") {
        exclude group: "org.json", module: "json"
        exclude group: "com.charleskorn.kaml", module: "kaml"
        exclude group: "org.xerial.snappy", module: "snappy-java"
        exclude group: "com.squareup.okio", module: "okio"
        exclude group: "org.apache.avro", module: "avro"
        exclude group: "org.jetbrains.kotlin", module: "kotlin-stdlib"
    }

    implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
    implementation ("com.datadoghq:java-dogstatsd-client:4.0.0") {
        exclude group: "com.github.jnr", module: "jnr-unixsocket"
    }
    implementation 'com.google.protobuf:protobuf-java-util:3.17.3'
    implementation 'io.vavr:vavr:1.0.0-alpha-4'

    implementation 'org.projectlombok:lombok:1.18.30'
    annotationProcessor 'org.projectlombok:lombok:1.18.30'

@ozangunalp
Copy link
Contributor

ozangunalp commented May 14, 2024

Also your dependency tree might be of interest.

+1

Also the event bus @ConsumeEvent annotation and messaging @Outgoing doesn't work together. I suggest you replace the event-bus address with an Emitter channel.

@prakashelango
Copy link
Author

prakashelango commented May 14, 2024

Also the event bus @ConsumeEvent annotation and messaging @Outgoing doesn't work together. I suggest you replace the event-bus address with an Emitter channel.

you mean sqs outgoing channel?

//At producer Existing
private final EventBus eventBus;

//Existing
.invoke(usageDataEntity -> eventBus.send("EVENTBUS", usageDataEntity))

// new?
.invoke(usageDataEntity -> eventBus.send("sqs-channel", usageDataEntity))
//At consumer responsible for sending events to sqs

     @Outgoing("sqs-channel") //Existing
     @ConsumeEvent("EVENTBUS")

 @Outgoing("sqs-channel") // new?
  public Multi<Message<String>> generate(final List<String> itemsTOSqs) {
return Multi.createFrom().iterable(itemsTOSqs).map(udmUsageData -> Message.of(itemTOSqs, Metadata.of(SqsOutboundMetadata.builder()
                .deduplicationId("id001")
                .groupId("groupId")
                .build())));
}

is this what you are expecting

@ozangunalp
Copy link
Contributor

You can either use the emitter directly :

@Ougoing("sqs-channel")
private final Emitter<String> sqsEmitter;

//Existing
.invoke(usageDataEntity -> sqsEmitter.send(...message)) // call send in for loop

Or use an internal channel instead of event bus:

@Ougoing("sqs-sender")
private final Emitter<List<String>> sqsEmitter;


.invoke(usageDataEntity -> sqsEmitter.send(usageDataEntity))

@Incoming("sqs-sender")
 @Outgoing("sqs-channel")
  public Multi<Message<String>> generate(final List<String> itemsTOSqs) {
return Multi.createFrom().iterable(itemsTOSqs).map(udmUsageData -> Message.of(itemTOSqs, Metadata.of(SqsOutboundMetadata.builder()
                .deduplicationId("id001")
                .groupId("groupId")
                .build())));
}

Or if your invoke is indeed a Mutiny stream you can subscribe with the outgoing channel directly.

@Channel("sqs-channel")
Subscriber<Message<String>> sqsOutStream;

...
.flatMap(usageDataEntity -> Multi.createFrom().iterable(itemsTOSqs).map(udmUsageData -> Message.of(itemTOSqs, Metadata.of(SqsOutboundMetadata.builder()
                .deduplicationId("id001")
                .groupId("groupId")
                .build()))))
.subscribe().withSubscriber(sqsOutStream);

The pseudocode may have some errors but I hope you get the idea.

@prakashelango
Copy link
Author

@ozangunalp Thanks for your response.

I have tried with emitter way which doesn't have any issue till sending messages, however it doesn't log error or emit message to queue

eventBus.send("SQS_EVENT", List.of("Sample")) // producer

    @Channel("sqs_channel")
    Emitter<String> sqsEmitter;

    @ConsumeEvent("SQS_EVENT")
    public void sendToSQSQueue(final List<String> itemsToSqs) {
        sqsEmitter.send("...message"); // some plain string to test
        }
mp:
  messaging:
    outgoing:
      sqs_channel:
        connector: smallrye-sqs
        queue:
          url: ${SQS_QUEUE_URL:https://sqs.us-west-2.amazonaws.com/[acctid]/test_queue.fifo}
public class SqsConfig {

    @Produces
    public SqsAsyncClient sqsAsyncClient() {
        return SqsAsyncClient.builder()
            .region(Region.of("us-west-2"))
            .build();
    }
}

using a internal channel on field fails with injection error '@Outgoing' not applicable to field

@Outgoing("sqs-sender")
private final Emitter<List<String>> sqsEmitter;

If possible can you share a working example or let me know if needed a reproducible code.

@ozangunalp
Copy link
Contributor

I've pushed an example project here: https://github.com/ozangunalp/quarkus-aws-sqs-connector

If you send a reproducer I can take a look.

I am changing this issue to a discussion.

@quarkusio quarkusio locked and limited conversation to collaborators May 15, 2024
@ozangunalp ozangunalp converted this issue into discussion #40654 May 15, 2024

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Projects
None yet
Development

No branches or pull requests

4 participants