New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transaction commit, rollback listeners #2816
Comments
Maybe it was not clear where the issue lies. So let me give you this example await this.entityManager.transaction(async (transactionManager) => {
const fooRepo = transactionManager.getCustomRepository(FooReposittory);
const barRepo = transactionManager.getCustomRepository(BarRepository);
const bazRepo = transactionManager.getCustomRepository(BazRepository);
await fooRepo.save(new FooEntity()); //@AfterInsert => send-email
await barRepo.save(new BarEntity()); //@AfterInsert => add job to message queue
await bazRepo.save(new BazEntity()) // Throws UniqueContstraintError => rollback
}) All the @AfterInsert hooks should be called only after the transaction has completed not after the individual save has been completed. |
Why do you think As for the example - |
I'm not saying @AfterInsert should hold up the transaction, i cannot call any action based on AfterInsert because the insert may be rolled back. If i want to push an event to message queue that a new Customer was created for example, i cannot be sure it was. Checking the transaction promise means i have to duplicate that part everywhere the customer entity gets inserted and defies the point of having a rich domain model. Can we maybe have a @afterCommit hook for that purpose? |
Let's keep it open for discussion. I think there may be good purposes for use of such decorator. But let others decide if it should be ORM or application functionality. |
If event subscribers have access to the current transaction ID, they would be able to queue up external event such as emails... If we have another subscriber that receives the transactions' commit / rollback events, then everything that was queued up for that transaction ID can be let loose. So I think the responsibility is a split between app and ORM: if the ORM gives us the transaction ID in the existing hooks, and adds transaction committed & rolled back hooks, the app can take it from there. |
solution here is to introduce different hooks. Current behaviour isn't related to ORM, its just was designed to cover one side of the coin. Because there are cases when you want your |
If hooks receive transaction promise as a param, then you could simply wait for it to be resolved (or rejected) and then do the action. afterUpdate(event: UpdateEvent<User>, txn: Promise<TypeORMTransaction>) {
//Pseudo code
txn.then(() => {
event.entity.publishDomainEvents(this.messageQueue);
})
} |
I'm in this case :
But it would be cool to have a way to wait for the transaction to be commited. |
In every case I only want to respond after the transaction is committed. I imagine there are use-cases for
Solution 1. Manually call a handler after each Solution 2 Have your typeorm subscriber place an event into another pubsub or eventing system. This works well and can be accomplished for all entities in a small amount of code. Assuming you have an eventing system that works for you. BUT it falls apart because the AfterInsert/Update subscriber is not provided with the full entity. In fact, it's only provided with what the user passes into the persistence call. Which is fairly useless when we have relations and generated values. So here we are forced to re-query the entity and now the solution is no longer generic. Solution 3: Write a wrapper around |
I'm also facing the same issue. I need a way to trigger a background job for post-processing the inserted rows, but if the processor is idle, the job is triggered before the transaction is committed, and cannot find it in the database:
|
Just to update solution 2 in my post above has the same problem. Even though it seems to work better in some cases there is still no guarantee that the tx is done committing when you may need to query the new data back from the db and get stale data as the tx is still out committing. We are back to the drawing board looking for a solution to this problem. |
@pleerock At initial glance, this seems pretty straightforward. Would you anticipate any complexities? Also asked here: #743 (comment) |
After looking at this closer I think this will be a bit of a challenge. Events are broadcast from a few different levels and transactions are managed on many different levels. In some cases, we won't have the context we need to broadcast an event. I think we would have to do something like collect all transaction commit entity events that could get broadcast and return them up through all levels until we arrive at the class that we are going to commit the transaction and then we could fire the collected commit events. @pleerock or anyone else with in-depth knowledge of this functionality please provide any guidance if you can. |
We've also looked at many ways of handling this problem, but fundamentally the transaction being across multiple systems is a problem. One option is to simply move the transaction to a single system.
Step 4 could be a spooler that sends to your normal queueing system, or you could build your queues directly in the DB (for some use cases). With PostgreSQL specifically, you can use LISTEN/NOTIFY in the transaction and the message will only be sent when the commit succeeds. Your spooler could process messages directly, or just trigger a read from the queue. In both cases you can either pipe to another queue system or process directly. Due to the transaction, you're guaranteed that everything will be committed before your queue will see any instructions. Rollbacks are completely invisible to the queue. |
I also think introducing different hooks is the best way forward. My use case is , providing notification on rows being added/removed, and what happens is that for longer transactions the service that receives the notification, can't see the added row beause the transaction is not commited yet. Yes, LISTEN/NOTIFY might be and option but in my case it's not feasible, as the notifications are consumed via Kafka. |
Hi, could be a dumb question, but what is afterTransactionCommit doing then? |
I have the same question. What does afterTransactionCommit do? You can't access the entity in there, only the transaction manager. |
@robinjhuang same reason I ended up here, do you have any solutions for finding the entity? |
if any body ends up here. a solution(workaround) would be to listen to |
I have a solution. But this is not so good. export class EntitySubscriber implements EntitySubscriberInterface<Entity> {
private entity: Entity | undefined;
listenTo = () => Entity;
/**
* @param event
*/
async afterTransactionCommit(event: TransactionCommitEvent) {
if (!this.entity) return;
console.log(this.entity);
// you can do anything
}
/**
* when the specific entity is updated, it will bind the entity to this.
*/
async afterUpdate(event: UpdateEvent<Entity>) {
this.entity = event.entity;
}
} |
This will not work since Another huge issue here is that the subscribers to my understanding are cached themselves during start-up, meaning storing anyting with state on the subscriber creates all kinds of race conditions. As far as i read from the docs each transaction gets its own EntityManager object. So for a working solution i would attach all the entities to the Keep in mind that since the transaction manager is shared between subscribers it may contain other entitites as well. In your specific subscriber you would still need to filter out the entities you are interested in. This is an simple example that seems to work export class TransactionSubscriber implements EntitySubscriberInterface {
afterInsert(event: InsertEvent<any>) {
event.manager['entities'].inserted.push(event.entity);
}
afterUpdate(event: UpdateEvent<any>) {
event.manager['entities'].updated.push(event.entity);
}
afterRemove(event: RemoveEvent<any>) {
event.manager['entities'].removed.push(event.entity);
}
beforeTransactionStart(event) {
event.manager['entities'] = event.manager['entities'] || {
inserted: [],
removed: [],
updated: [],
};
}
}
export class UserRegisteredSubscriber extends TransactionSubscriber {
listenTo() {
return User;
}
afterTransactionCommit(event) {
const commitedUsers = event.manager['entities'].inserted.filter(entity => entity.constructor === User)
for (const user of commitedUsers) {
this..messageQueue.push({
event: 'user.registered',
data: user.email
})
}
}
} |
@ibox4real Since I'm still dealing with production issues because of this.... I tried your solution and it almost worked. Is there any way to know if I'm in a transaction in a subscriber method? For example, the So I can easily miss events causing a lot of problems as well. So far, I don't see a way to determine if I'm in a transaction. |
As far as i know only |
|
Actually, we can tell if we are in a transaction through the |
Could you provide your solution here? |
I see this one has been giving people headaches since 2018. Here's my 2 cents:
I'm not describing here any specific way to store "postponed reaction data" in queryRunner. Use your imagination. :) |
I can confirm this approach works. Thanks for shedding the light on this, @constb! |
I get event.queryRunner.data as blank in afterTransactionCommit. |
Issue type:
[ ] question
[ ] bug report
[x] feature request
[ ] documentation issue
Database system/driver:
[ ]
cordova
[ ]
mongodb
[ ]
mssql
[x]
mysql
/mariadb
[ ]
oracle
[ ]
postgres
[ ]
sqlite
[ ]
sqljs
[ ]
react-native
[ ]
expo
TypeORM version:
[x]
latest
[ ]
@next
[ ]
0.x.x
(or put your version here)Steps to reproduce or a small repository showing the problem:
So my question relates to if it is possible to listen if a transaction is commited or not inside the subscriber.
I want to listen to changes on some entity in the subscriber and push generated events to a message queue.
The problem is if this is part of transaction that gets rolled back then the afterUpdate() is still being fired.
Ideally i would like to listen if the particular transaction was commit or not.
Your thoughts on this and if there is a way to do it in this fashion.
The text was updated successfully, but these errors were encountered: