You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
emit several messages to another channel as a result of the database queries
All the above steps need to be executed in a single transaction, so for example, if the database fails, the message is nack, or if the emission of the messages fails the database queries are rollback.
In the Quarkus service, we're using the smallrye messaging kafka and the Hibernate ORM extensions. I'm aware that there is an existing section in the Kafka guide to chain transactions between Kafka and Hibernate Reactive: https://quarkus.io/guides/kafka#chaining-kafka-transactions-with-hibernate-reactive-transactions, but nothing about to do that with Hibernate ORM which seems to me like a very important requirement.
In the reproducer, the primary key of author is the name, so we can't have two authors with the same name. As part of the initial "import.sql", we have created an author "Jose" without books. Then, we have a resource to send a message to a topic:
And this message will be consumed by the class NewAuthorConsumer that will insert the author using the given name and send another message to another topic to create the books. What I expected here is that when adding another author with the same name "Jose" and a constraint exception is thrown, the messages to the book topic are not sent.
The implementations of the NewAuthorConsumer that I tried:
Using the KafkaTransactions:
/** * Approach 3: Using KafkaTransactions */@ApplicationScopedpublicclassNewAuthorConsumer {
@Channel("new-book-out") KafkaTransactions<String> kafkaTx;
@Incoming("new-author-in")
Uni<Void> consume(Message<String> message) {
Log.infof("Received new message in authors topic with name `{}`", message.getPayload());
returnkafkaTx.withTransactionAndAck(message, emitter -> {
persistAuthor(message.getPayload());
emitter.send(message.getPayload() + "-book1");
returnUni.createFrom().voidItem();
}).replaceWithVoid();
}
@TransactionalpublicvoidpersistAuthor(Stringname) {
Authorauthor = newAuthor();
author.name = name;
author.persist();
}
}
As expected, this fails because with:
Caused by: io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread.
The error is expected and correct.
Using Emitter:
@ApplicationScopedpublicclassNewAuthorConsumer {
@Channel("new-book-out") Emitter<String> newBook;
@Transactional@Incoming("new-author-in")
CompletionStage<Void> consume(StringauthorName) {
Log.infof("Received new message in authors topic with name `{}`", authorName);
Authorauthor = newAuthor();
author.name = authorName;
author.persist();
returnnewBook.send(authorName + "-book1");
}
}
This time when adding an existing author, it fails to be inserted but the book message is wrongly sent and hence the book is inserted into the db.
Expected behavior
Spite of the nature of the reactive model of the smallrye messaging extension, it should be a way to chain transactions between kafka and Hibernate ORM.
If there is a way to do this, it would need to be documented.
If not, I feel this is a very strong limitation, yet I'm not sure if this should be a bug or a feature request.
The kafka and the Hiberante ORM transactions are not chained.
How to Reproduce?
1.- git clone https://github.com/Sgitario/quarkus-examples
2.- cd quarkus-examples/kafka-orm-transactions
3.- comment/uncomment/add the approach you would like to use in NewAuthorConsumer
4.- start dev mode: mvn quarkus:dev
5.- add the existing author: curl -X POST http://localhost:8080/api/authors/Jose
6.- you should see the primary key exception in the service logs:
Caused by: org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException: Violación de indice de Unicidad ó Clave primaria: "PUBLIC.PRIMARY_KEY_7 ON PUBLIC.AUTHOR(NAME) VALUES ( /* 1 */ 'Jose' )"
Unique index or primary key violation: "PUBLIC.PRIMARY_KEY_7 ON PUBLIC.AUTHOR(NAME) VALUES ( /* 1 */ 'Jose' )"; SQL statement:
insert into Author (name) values (?) [23505-224]
7.- check the books: curl http://localhost:8080/api/books
And because of the previous exception, the message should have not been sent and there should not be any books. However, the message was wrongly sent and there are books:
[{"id":1,"title":"Jose-book1"}]
Quarkus version or git rev
999-SNAPSHOT
Build tool (ie. output of mvnw --version or gradlew --version)
mvn
The text was updated successfully, but these errors were encountered:
If it can't be fixed/supported, I think this should be noted in the documentation as an incompatibility between these two extensions. I would say this is a major limitation that most users tend to oversee/expect to be working.
Describe the bug
I'm using a Quarkus service that requires:
All the above steps need to be executed in a single transaction, so for example, if the database fails, the message is nack, or if the emission of the messages fails the database queries are rollback.
In the Quarkus service, we're using the smallrye messaging kafka and the Hibernate ORM extensions. I'm aware that there is an existing section in the Kafka guide to chain transactions between Kafka and Hibernate Reactive: https://quarkus.io/guides/kafka#chaining-kafka-transactions-with-hibernate-reactive-transactions, but nothing about to do that with Hibernate ORM which seems to me like a very important requirement.
I've written a reproducer in https://github.com/Sgitario/quarkus-examples/tree/main/kafka-orm-transactions with several approaches I tried (see NewAuthorConsumer.java).
In the reproducer, the primary key of author is the name, so we can't have two authors with the same name. As part of the initial "import.sql", we have created an author "Jose" without books. Then, we have a resource to send a message to a topic:
And this message will be consumed by the class NewAuthorConsumer that will insert the author using the given name and send another message to another topic to create the books. What I expected here is that when adding another author with the same name "Jose" and a constraint exception is thrown, the messages to the book topic are not sent.
The implementations of the NewAuthorConsumer that I tried:
As expected, this fails because with:
The error is expected and correct.
This time when adding an existing author, it fails to be inserted but the book message is wrongly sent and hence the book is inserted into the db.
Expected behavior
Spite of the nature of the reactive model of the smallrye messaging extension, it should be a way to chain transactions between kafka and Hibernate ORM.
If there is a way to do this, it would need to be documented.
If not, I feel this is a very strong limitation, yet I'm not sure if this should be a bug or a feature request.
I also raised the same question in smallrye/smallrye-reactive-messaging#2609.
Actual behavior
The kafka and the Hiberante ORM transactions are not chained.
How to Reproduce?
1.- git clone https://github.com/Sgitario/quarkus-examples
2.- cd quarkus-examples/kafka-orm-transactions
3.- comment/uncomment/add the approach you would like to use in NewAuthorConsumer
4.- start dev mode:
mvn quarkus:dev
5.- add the existing author:
curl -X POST http://localhost:8080/api/authors/Jose
6.- you should see the primary key exception in the service logs:
7.- check the books:
curl http://localhost:8080/api/books
And because of the previous exception, the message should have not been sent and there should not be any books. However, the message was wrongly sent and there are books:
Quarkus version or git rev
999-SNAPSHOT
Build tool (ie. output of
mvnw --version
orgradlew --version
)mvn
The text was updated successfully, but these errors were encountered: