Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction commit, rollback listeners #2816

Open
ibox4real opened this issue Sep 18, 2018 · 30 comments
Open

Transaction commit, rollback listeners #2816

ibox4real opened this issue Sep 18, 2018 · 30 comments

Comments

@ibox4real
Copy link

ibox4real commented Sep 18, 2018

Issue type:

[ ] question
[ ] bug report
[x] feature request
[ ] documentation issue

Database system/driver:

[ ] cordova
[ ] mongodb
[ ] mssql
[x] mysql / mariadb
[ ] oracle
[ ] postgres
[ ] sqlite
[ ] sqljs
[ ] react-native
[ ] expo

TypeORM version:

[x] latest
[ ] @next
[ ] 0.x.x (or put your version here)

Steps to reproduce or a small repository showing the problem:

So my question relates to if it is possible to listen if a transaction is commited or not inside the subscriber.

I want to listen to changes on some entity in the subscriber and push generated events to a message queue.

@EventSubscriber()
export class UserSubscriber implements EntitySubscriberInterface {
    constructor(
       private messageQueue: MessageQueue
    ) {}    

    listenTo() {
        return User;
    }

    afterUpdate(event: UpdateEvent<User>) {
        event.entity.publishDomainEvents(this.messageQueue);
    }
}

The problem is if this is part of transaction that gets rolled back then the afterUpdate() is still being fired.

Ideally i would like to listen if the particular transaction was commit or not.

@EventSubscriber()
export class UserSubscriber implements EntitySubscriberInterface {
    constructor(
       private messageQueue: MessageQueue
    ) {}    

    listenTo() {
        return User;
    }

    afterUpdate(event: UpdateEvent<User>) {
       //Pseudo code
        event.queryrunner.on('commit', () => {
              event.entity.publishDomainEvents(this.messageQueue);
        })
    }
}

Your thoughts on this and if there is a way to do it in this fashion.

@ibox4real
Copy link
Author

Maybe it was not clear where the issue lies. So let me give you this example

await this.entityManager.transaction(async (transactionManager) => {
     const fooRepo = transactionManager.getCustomRepository(FooReposittory);
     const barRepo = transactionManager.getCustomRepository(BarRepository);
     const bazRepo = transactionManager.getCustomRepository(BazRepository);
     
    await fooRepo.save(new FooEntity()); //@AfterInsert => send-email
    await barRepo.save(new BarEntity()); //@AfterInsert => add job to message queue
    await bazRepo.save(new BazEntity()) // Throws UniqueContstraintError => rollback
}) 

All the @AfterInsert hooks should be called only after the transaction has completed not after the individual save has been completed.

@Kononnable
Copy link
Contributor

Why do you think @AfterInsert should be called after transaction has completed? It's just like trigger(but on the client side). If you want to check if transaction was committed you can check if transaction promise was rejected.

As for the example - @AfterInsert shouldn't be responsible for sending an email - transaction should be closed as soon as possible, so sending email should be done after.

@ibox4real
Copy link
Author

I'm not saying @AfterInsert should hold up the transaction, i cannot call any action based on AfterInsert because the insert may be rolled back. If i want to push an event to message queue that a new Customer was created for example, i cannot be sure it was. Checking the transaction promise means i have to duplicate that part everywhere the customer entity gets inserted and defies the point of having a rich domain model. Can we maybe have a @afterCommit hook for that purpose?

@Kononnable
Copy link
Contributor

Let's keep it open for discussion. I think there may be good purposes for use of such decorator. But let others decide if it should be ORM or application functionality.

@mariusmarais
Copy link

If event subscribers have access to the current transaction ID, they would be able to queue up external event such as emails... If we have another subscriber that receives the transactions' commit / rollback events, then everything that was queued up for that transaction ID can be let loose.

So I think the responsibility is a split between app and ORM: if the ORM gives us the transaction ID in the existing hooks, and adds transaction committed & rolled back hooks, the app can take it from there.

@pleerock
Copy link
Member

pleerock commented Jan 25, 2019

solution here is to introduce different hooks. Current behaviour isn't related to ORM, its just was designed to cover one side of the coin. Because there are cases when you want your save to fail if your code in subscribers fail.

@moltar
Copy link

moltar commented Oct 31, 2019

If hooks receive transaction promise as a param, then you could simply wait for it to be resolved (or rejected) and then do the action.

afterUpdate(event: UpdateEvent<User>, txn: Promise<TypeORMTransaction>) {
       //Pseudo code
              txn.then(() => {
                   event.entity.publishDomainEvents(this.messageQueue);
              })
    }

@greg-hoarau
Copy link

I'm in this case :

... there are cases when you want your save to fail if your code in subscribers fail.

But it would be cool to have a way to wait for the transaction to be commited.

@greenreign
Copy link

greenreign commented Jun 16, 2020

In every case I only want to respond after the transaction is committed. I imagine there are use-cases for BeforeInsertCommit or BeforeUpdateCommit which is really what is happening. But for

AuditLogger ( record audit trail)
SearchService (index entity details into search)
UserNotificationService (email, SMS users)
EntityAccountingService (update unread counts)
EntitySubscriberService (notify client subscribers that something has changed)

AfterInsert /Update looks like they would be perfect for this. However, we found, since the transaction is still open you'll run into all sorts of locking problems.

Solution 1. Manually call a handler after each save update delete call. This is mixing concerns a bit but could be made generic. Still, it's brittle. Someone could miss a call or make the call erroneously. It has to be called for every entity which usually means many different classes. It's just not a good approach for something that needs to happen every single time.

Solution 2 Have your typeorm subscriber place an event into another pubsub or eventing system. This works well and can be accomplished for all entities in a small amount of code. Assuming you have an eventing system that works for you. BUT it falls apart because the AfterInsert/Update subscriber is not provided with the full entity. In fact, it's only provided with what the user passes into the persistence call. Which is fairly useless when we have relations and generated values. So here we are forced to re-query the entity and now the solution is no longer generic.

Solution 3: Write a wrapper around Repository to emit your custom events to your custom eventing system. This is a bit of a mess if you are using something like nestjs which provides repositories for you.

@axos88
Copy link
Contributor

axos88 commented Jul 18, 2020

I'm also facing the same issue. I need a way to trigger a background job for post-processing the inserted rows, but if the processor is idle, the job is triggered before the transaction is committed, and cannot find it in the database:


    afterInsert(event: InsertEvent<Photo>): Promise<any> | void {
      console.log("afterInsert");
      const data: ProcessPhotoJob = {
        photoId: event.entity.id
      }

      this.queue.add(PROCESS_PHOTO, data)  // This needs to be executed only after the transaction has committed.
  }  

@greenreign
Copy link

Just to update solution 2 in my post above has the same problem. Even though it seems to work better in some cases there is still no guarantee that the tx is done committing when you may need to query the new data back from the db and get stale data as the tx is still out committing.

We are back to the drawing board looking for a solution to this problem.

@greenreign
Copy link

greenreign commented Jul 24, 2020

@pleerock
Would you welcome a PR for this? I would imagine 3 new events
afterInsertCommit
afterUpdateCommit
afterRemoveCommit

At initial glance, this seems pretty straightforward. Would you anticipate any complexities?

Also asked here: #743 (comment)

@greenreign
Copy link

After looking at this closer I think this will be a bit of a challenge. Events are broadcast from a few different levels and transactions are managed on many different levels. In some cases, we won't have the context we need to broadcast an event.

I think we would have to do something like collect all transaction commit entity events that could get broadcast and return them up through all levels until we arrive at the class that we are going to commit the transaction and then we could fire the collected commit events.

@pleerock or anyone else with in-depth knowledge of this functionality please provide any guidance if you can.

@mariusmarais
Copy link

We've also looked at many ways of handling this problem, but fundamentally the transaction being across multiple systems is a problem. One option is to simply move the transaction to a single system.

  1. Create separate tables for events, either "do-thing" tables (send email) or "thing-happened" event tables (user activated).
  2. Write to these tables as part of a normal transaction. Thereby half-completes are impossible.
  3. Commit.
  4. In another process, read from your event tables and process.

Step 4 could be a spooler that sends to your normal queueing system, or you could build your queues directly in the DB (for some use cases).

With PostgreSQL specifically, you can use LISTEN/NOTIFY in the transaction and the message will only be sent when the commit succeeds. Your spooler could process messages directly, or just trigger a read from the queue. In both cases you can either pipe to another queue system or process directly.

Due to the transaction, you're guaranteed that everything will be committed before your queue will see any instructions. Rollbacks are completely invisible to the queue.

@zaro
Copy link
Contributor

zaro commented Oct 21, 2020

I also think introducing different hooks is the best way forward. My use case is , providing notification on rows being added/removed, and what happens is that for longer transactions the service that receives the notification, can't see the added row beause the transaction is not commited yet.

Yes, LISTEN/NOTIFY might be and option but in my case it's not feasible, as the notifications are consumed via Kafka.

@psteinroe
Copy link

Hi,

could be a dumb question, but what is afterTransactionCommit doing then?

@robinjhuang
Copy link

I have the same question. What does afterTransactionCommit do? You can't access the entity in there, only the transaction manager.

@armingjazi
Copy link

@robinjhuang same reason I ended up here, do you have any solutions for finding the entity?

@armingjazi
Copy link

if any body ends up here. a solution(workaround) would be to listen to afterInsert cache the entities in an array and pop them in transaction commit listener.

@sesame-goma
Copy link

sesame-goma commented Feb 3, 2021

I have a solution. But this is not so good.

export class EntitySubscriber implements EntitySubscriberInterface<Entity> {
  private entity: Entity | undefined;
  listenTo = () => Entity;

  /**
   * @param event
   */
  async afterTransactionCommit(event: TransactionCommitEvent) {
    if (!this.entity) return;
    console.log(this.entity);
    // you can do anything
  }

  /**
   * when the specific entity is updated, it will bind the entity to this.
   */
  async afterUpdate(event: UpdateEvent<Entity>) {
    this.entity = event.entity;
  }
}

@ibox4real
Copy link
Author

ibox4real commented Mar 19, 2021

I have a solution. But this is not so good.

export class EntitySubscriber implements EntitySubscriberInterface<Entity> {
  private entity: Entity | undefined;
  listenTo = () => Entity;

  /**
   * @param event
   */
  async afterTransactionCommit(event: TransactionCommitEvent) {
    if (!this.entity) return;
    console.log(this.entity);
    // you can do anything
  }

  /**
   * when the specific entity is updated, it will bind the entity to this.
   */
  async afterUpdate(event: UpdateEvent<Entity>) {
    this.entity = event.entity;
  }
}

This will not work since afterUpdate is called for each entity separately but the afterTransactionCommit is called once at the end of the transaction. You would only get the last entity that was commited.

Another huge issue here is that the subscribers to my understanding are cached themselves during start-up, meaning storing anyting with state on the subscriber creates all kinds of race conditions.

As far as i read from the docs each transaction gets its own EntityManager object. So for a working solution i would attach all the entities to the manager and later read them in the afterTransactionCommit.

Keep in mind that since the transaction manager is shared between subscribers it may contain other entitites as well. In your specific subscriber you would still need to filter out the entities you are interested in.

This is an simple example that seems to work

export class TransactionSubscriber implements EntitySubscriberInterface {

  afterInsert(event: InsertEvent<any>) {
    event.manager['entities'].inserted.push(event.entity);
  }

  afterUpdate(event: UpdateEvent<any>) {
    event.manager['entities'].updated.push(event.entity);
  }

  afterRemove(event: RemoveEvent<any>) {
    event.manager['entities'].removed.push(event.entity);
  }

  beforeTransactionStart(event) {
    event.manager['entities'] =   event.manager['entities']  || {
      inserted: [],
      removed: [],
      updated: [],
    };
  }
}

export class UserRegisteredSubscriber extends TransactionSubscriber {
   listenTo() {
     return User;
   }
   
   afterTransactionCommit(event) {
       const commitedUsers = event.manager['entities'].inserted.filter(entity => entity.constructor === User)
       for (const user of commitedUsers) {
          this..messageQueue.push({
             event: 'user.registered',
             data: user.email
           })
       }
   }
}

@tataqwerty
Copy link

@greenreign
Copy link

@ibox4real Since I'm still dealing with production issues because of this.... I tried your solution and it almost worked. Is there any way to know if I'm in a transaction in a subscriber method? For example, the afterTransactionCommit is never called for repository.delete

So I can easily miss events causing a lot of problems as well. So far, I don't see a way to determine if I'm in a transaction.

@ibox4real
Copy link
Author

@ibox4real Since I'm still dealing with production issues because of this.... I tried your solution and it almost worked. Is there any way to know if I'm in a transaction in a subscriber method? For example, the afterTransactionCommit is never called for repository.delete

So I can easily miss events causing a lot of problems as well. So far, I don't see a way to determine if I'm in a transaction.

As far as i know only save/remove trigger the EntitySubscriber.

@greenreign
Copy link

greenreign commented Sep 20, 2021

delete calls afterRemove but it's not in a transaction. Thus the approach outlined fails. But, if I could know that it was not in a transaction then I could reliably fire my async code. But it doesn't appear there is any way to know this.

@greenreign
Copy link

greenreign commented Sep 21, 2021

Actually, we can tell if we are in a transaction through the queryRunner so my solution is similar to @ibox4real but also checking to see if we are in a transaction. When inside a trx, I will queue my downstream event by storing in the entity manager and firing all stored events when afterTransactionCommit is called. When not in a trx I will fire my downstream event immediately

@likern
Copy link

likern commented Jan 5, 2022

Actually, we can tell if we are in a transaction through the queryRunner so my solution is similar to @ibox4real but also checking to see if we are in a transaction. When inside a trx, I will queue my downstream event by storing in the entity manager and firing all stored events when afterTransactionCommit is called. When not in a trx I will fire my downstream event immediately

Could you provide your solution here?

@constb
Copy link
Contributor

constb commented Mar 13, 2022

I see this one has been giving people headaches since 2018. Here's my 2 cents:

  1. check event.queryRunner.isTransactionActive to see if we need to postpone "reaction".
  2. if we're postponing use event.queryRunner.data – it's intended to store arbitrary data related to this particular queryRunner "activity" and will be cleared after queryRunner is recycled (after transaction is committed/rolled back)
  3. in afterTransactionCommit if event.queryRunner.data contains postponed reaction data, perform the reaction.

I'm not describing here any specific way to store "postponed reaction data" in queryRunner. Use your imagination. :)

@reyhankim
Copy link

I see this one has been giving people headaches since 2018. Here's my 2 cents:

  1. check event.queryRunner.isTransactionActive to see if we need to postpone "reaction".
  2. if we're postponing use event.queryRunner.data – it's intended to store arbitrary data related to this particular queryRunner "activity" and will be cleared after queryRunner is recycled (after transaction is committed/rolled back)
  3. in afterTransactionCommit if event.queryRunner.data contains postponed reaction data, perform the reaction.

I'm not describing here any specific way to store "postponed reaction data" in queryRunner. Use your imagination. :)

I can confirm this approach works. Thanks for shedding the light on this, @constb!

@nachiketAsurion
Copy link

I see this one has been giving people headaches since 2018. Here's my 2 cents:

  1. check event.queryRunner.isTransactionActive to see if we need to postpone "reaction".
  2. if we're postponing use event.queryRunner.data – it's intended to store arbitrary data related to this particular queryRunner "activity" and will be cleared after queryRunner is recycled (after transaction is committed/rolled back)
  3. in afterTransactionCommit if event.queryRunner.data contains postponed reaction data, perform the reaction.

I'm not describing here any specific way to store "postponed reaction data" in queryRunner. Use your imagination. :)

I can confirm this approach works. Thanks for shedding the light on this, @constb!

I get event.queryRunner.data as blank in afterTransactionCommit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests