Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message are not commited (loss) when using @TransactionalEventListener to send a message in a JPA Transaction #1309

Closed
kuldeepkalassts opened this issue Mar 14, 2021 · 21 comments · Fixed by #1310

Comments

@kuldeepkalassts
Copy link

Background of the code:

In order to replicate a production scenario, I have created a dummy app that will basically save something in DB in a transaction, and in the same method, it publishEvent and publishEvent send a message to rabbitMQ.

Classes and usages

Transaction Starts from this method.:

@Override
	@Transactional
	public EmpDTO createEmployeeInTrans(EmpDTO empDto) {
		return createEmployee(empDto);
	}

This method saves the record in DB and also triggers publishEvent

@Override
	public EmpDTO createEmployee(EmpDTO empDTO) {
		
		EmpEntity empEntity = new EmpEntity();
		BeanUtils.copyProperties(empDTO, empEntity);

		System.out.println("<< In Transaction : "+TransactionSynchronizationManager.getCurrentTransactionName()+" >>  Saving data for employee " + empDTO.getEmpCode());

		// Record data into a database
		empEntity = empRepository.save(empEntity);	
		
		// Sending event , this will send the message.
		eventPublisher.publishEvent(new ActivityEvent(empDTO));
		
		return createResponse(empDTO, empEntity);
	}

This is ActivityEvent

import org.springframework.context.ApplicationEvent;
import com.kuldeep.rabbitMQProducer.dto.EmpDTO;
public class ActivityEvent extends ApplicationEvent {
	public ActivityEvent(EmpDTO source) {
		super(source);
	}
}

And this is TransactionalEventListener for the above Event.

	//@Transactional(propagation = Propagation.REQUIRES_NEW)
	@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
	public void onActivitySave(ActivityEvent activityEvent) {
		System.out.println("Activity got event ... Sending message .. ");
		kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);		
	}

This is kRabbitTemplate is a bean config like this :

@Bean
	public RabbitTemplate kRabbitTemplate(ConnectionFactory connectionFactory) {
		final RabbitTemplate kRabbitTemplate = new RabbitTemplate(connectionFactory);
		kRabbitTemplate.setChannelTransacted(true);
		kRabbitTemplate.setMessageConverter(kJsonMessageConverter());
		return kRabbitTemplate;
	}

Problem Definition

When I am saving a record and sending a message on rabbitMQ using the above code flow, My messages are not delivered on the server means they lost.

What I understand about the transaction in AMQP is :

  1. If the template is transacted, but convertAndSend is not called from Spring/JPA Transaction then messages are committed within the template's convertAndSend method.
// this is a snippet from org.springframework.amqp.rabbit.core.RabbitTemplate.doSend()
	if (isChannelLocallyTransacted(channel)) {
			// Transacted channel created by this template -> commit.
			RabbitUtils.commitIfNecessary(channel);
		}
  1. But if the template is transacted and convertAndSend is called from Spring/JPA Transaction then this isChannelLocallyTransacted in doSend method will evaluate false and commit will be done in the method which initiated Spring/JPA Transaction.

What I found after investigating the reason for message loss in my above code.

  • Spring transaction was active when I called convertAndSend method, so it was supposed to commit the message in Spring transaction.
  • For that, RabbitTemplate binds the resources and registers the Synchronizations before sending the message in bindResourceToTransaction of org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.
	public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolder resourceHolder,
			ConnectionFactory connectionFactory, boolean synched) {
		if (TransactionSynchronizationManager.hasResource(connectionFactory)
				|| !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
			return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
		}
		TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
		resourceHolder.setSynchronizedWithTransaction(true);
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
					connectionFactory));
		}
		return resourceHolder;
	}

In my code, after resource bind, it is not able to registerSynchronization because TransactionSynchronizationManager.isSynchronizationActive()==false. and since it fails to registerSynchronization, spring commit did not happen for the rabbitMQ message as AbstractPlatformTransactionManager.triggerAfterCompletion calls RabbitMQ's commit for each synchronization.

What problem I faced because of the above issue.

  • Message was not committed in the spring transaction, so the message lost.
  • As resource was added in bindResourceToTransaction, this resource remained bind and did not let add the resource for any other message to send in the same thread.

Possible Root Cause of TransactionSynchronizationManager.isSynchronizationActive()==false

  • I found the method which starts the transaction removed the synchronization in triggerAfterCompletion of org.springframework.transaction.support.AbstractPlatformTransactionManager class. because status.isNewSynchronization() evaluated true after DB opertation (this usually not happens if I call convertAndSend without ApplicationEvent).
	private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
		if (status.isNewSynchronization()) {
			List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
			TransactionSynchronizationManager.clearSynchronization();
			if (!status.hasTransaction() || status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.trace("Triggering afterCompletion synchronization");
				}
				// No transaction or new transaction for the current scope ->
				// invoke the afterCompletion callbacks immediately
				invokeAfterCompletion(synchronizations, completionStatus);
			}
			else if (!synchronizations.isEmpty()) {
				// Existing transaction that we participate in, controlled outside
				// of the scope of this Spring transaction manager -> try to register
				// an afterCompletion callback with the existing (JTA) transaction.
				registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
			}
		}
	}

What I Did to overcome on this issue

I simply added @Transactional(propagation = Propagation.REQUIRES_NEW) along with on @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) in onActivitySave method and it worked as a new transaction was started.

What I need to know

  1. Why this status.isNewSynchronization in triggerAfterCompletion method when using ApplicationEvent?
  2. If the transaction was supposed to terminate in the parent method, why I got TransactionSynchronizationManager.isActualTransactionActive()==true in Listner class?
  3. If Actual Transaction Active, was it supposed to remove the synchronization?
  4. In bindResourceToTransaction, do spring AMQP assumed an active transaction without synchronization? if the answer is yes, why not to synchronization. init if it is not activated?
  5. If I am propagating a new transaction then I am losing the parent transaction, is there any better way to do it?

Please help me with this, it is a hot production issue, and I am not very sure about the fix I have done.

@garyrussell
Copy link
Contributor

I need to investigate, but most likely it's because it is called in the AFTER_COMMIT phase, so there is no transaction at this time.

We certainly shouldn't be leaving the dangling bound resource, though; if there's really no transaction we should use a local transaction instead.

This logic predated @TransactionalEventListener by many years so I guess nobody has hit this before now.

Your work around sounds correct.

If I am propagating a new transaction then I am losing the parent transaction, is there any better way to do it?

Not sure what you mean by that, there is no "parent" RabbitMQ transaction, only JPA.

I see you also asked this here https://stackoverflow.com/questions/66613863/message-are-not-commited-loss-when-using-transactionaleventlistener-to-send-a

Please don't waste your time and ours asking the same question in multiple places.

@kuldeepkalassts
Copy link
Author

kuldeepkalassts commented Mar 15, 2021

By parent transaction, I mean the actual transaction where I did initiate JPA transaction by using @transaction.
in my case

@Transactional
	public EmpDTO createEmployeeInTrans


I asked in StackOverflow, but I have to ask it here also as I did not get any response/acknowledgment on that question.
Besides that, StackOverflow is more of the public forum, So my understanding was someone from other than the Spring AMQP team can answer as well.

@garyrussell
Copy link
Contributor

To be fair, you asked it there on a weekend; you should give us at least one working day to respond.

The problem is, with this configuration, we are in a quasi-transactional state, while there is indeed a transaction in process, the synchronizations are already cleared because the transaction has already committed.

Using @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) works.

@garyrussell garyrussell added this to the 2.3.6 milestone Mar 15, 2021
@garyrussell garyrussell self-assigned this Mar 15, 2021
@kuldeepkalassts
Copy link
Author

I agree with you, and accept my apologies for being impatient.

I might have some understanding gap here, If a transaction is already committed, Why we got TransactionSynchronizationManager.isActualTransactionActive() == true in after commit phase.(in our method with TransactionalEventListener annotation)

So will we consider this kind of transaction as actual transaction active or will we ignore such transaction and do a local transaction commit ?

@garyrussell
Copy link
Contributor

The transaction is still "active", but we're in the after completion phase and the JPA TM (AbstractPlatformTM) clears the synchronizations before calling their afterCompletion() - so it's too late to register a new synchronization.

	private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
		if (status.isNewSynchronization()) {
			List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
			TransactionSynchronizationManager.clearSynchronization(); // <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
			if (!status.hasTransaction() || status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.trace("Triggering afterCompletion synchronization");
				}
				// No transaction or new transaction for the current scope ->
				// invoke the afterCompletion callbacks immediately
				invokeAfterCompletion(synchronizations, completionStatus);
			}
			else if (!synchronizations.isEmpty()) {
				// Existing transaction that we participate in, controlled outside
				// of the scope of this Spring transaction manager -> try to register
				// an afterCompletion callback with the existing (JTA) transaction.
				registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
			}
		}
	}

I need to think through a solution, but BEFORE_COMMIT should work for you.

@gosachin1
Copy link

Hi Gary, (I am in same team of Kuldeep)

By following statement :
This logic predated @TransactionalEventListener by many years so I guess nobody has hit this before now.

Are you suggesting that the @TransactionalEventListener we are using is an older pattern. Does this mean we should be using something else?

Thanks

@garyrussell
Copy link
Contributor

No; I am not suggesting that, I am suggesting that nobody has hit this problem before.

This capability was added to Spring in 2015; the Rabbit transaction logic has been around since 2010.

That just tells me that nobody has used this feature (with AFTER_COMMIT) before you.

@kuldeepkalassts
Copy link
Author

I tried BEFORE_COMMIT and it worked as expected. Now we have two workarounds for now.

  1. Use BEFORE_COMMIT
  2. @transactional(propagation = Propagation.REQUIRES_NEW)

I checked how BEFORE_COMMIT Works and found that

Class AbstractPlatformTransactionManager ...
	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;

			try {
				boolean unexpectedRollback = false;
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
                                 ...

In processCommit calls, triggerBeforeCommit, and that will trigger the ActivityEvent (BEFORE_COMMIT) and send the message.
This means we can use the active transaction with active synchronization.

And I also understand Propagation.REQUIRES_NEW will start a fresh new transaction.

@garyrussell, So what will you suggest in our case? BEFORE_COMMIT or Propagation.REQUIRES_NEW?

@garyrussell
Copy link
Contributor

The problem is BEFORE_COMMIT will send the message even if the JPA transaction commit() fails afterwards, so the better work around is @Transactional.

However, I wouldn't start a new JPA transaction, use a RabbitTransactionManager there instead, to just start a RabbitMQ transaction instead of using synchronization.

@kuldeepkalassts
Copy link
Author

The problem is BEFORE_COMMIT will send the message even if the JPA transaction commit() fails afterwards, so the better work around is @transactional.

I checked it, I deleted the table, at first processCommit did not called doCommit(status); as status.isNewTransaction() == false but when all DB operation were completed, processCommit was invoked again, this time it called triggerBeforeCommit(status); and it triggered the ActivityEvent (BEFORE_COMMIT) and send the message. (Not Commit)

After sending the message, now doCommit(status); was called and it set status=rollback , so when 'triggerAfterCompletion()' calls
RabbitResourceSynchronization.afterCompletion , since status=rollback so it calls this.resourceHolder.rollbackAll();

In Short, the message will not be committed in this case as well because of DB exception/rollback.

And I agree with the point that the new JPA transaction is not good. But in our actual framework, we use DB Operation + Send Message So I am not sure about RabbitTransactionManager

@garyrussell
Copy link
Contributor

In Short, the message will not be committed in this case as well because of DB exception/rollback.

In that case, that is the better solution. Starting a new transaction for the record send will be an independent transaction that will commit whether or not the JPA transaction commits.

To answer your last point...

But the rabbit transaction is simply synchronized with the JPA transaction; there is no point in starting a new JPA transaction that does nothing.

Using a RabbitTransactionManager (just for that one @Transactional method) will avoid starting the unnecessary JPA transaction.

But, it seems BEFORE_COMMIT is the best work around until I fix the problem (should be fixed by Wednesday).

@garyrussell
Copy link
Contributor

Actually, I don't think it's a work-around, it's the solution (using BEFORE_COMMIT).

I can fix the ConnectionFactoryUtils to not bind an orphaned ResourceHolder , but then the local transaction will commit, even if the JPA transaction later rolls back, which is not your desired result.

garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Mar 15, 2021
Resolves spring-projects#1309

If the `RabbitTemplate` was called from a `@TransactionalEventListener`,
with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations
are already cleared. It is too late to synchronize this transaction.

We end up with an orphaned `ResourceHolder` with pending transaction commits.

Don't bind the resource holder if synchronization is not active.

However, the proper solution, if users want to synchronize the rabbit transaction
with the global transaction, is to use the `BEFORE_COMMIT` phase.

See the discussion on the Github issue for more information.

**cherry-pick to 2.2.x**
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Mar 15, 2021
Resolves spring-projects#1309

If the `RabbitTemplate` was called from a `@TransactionalEventListener`,
with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations
are already cleared. It is too late to synchronize this transaction.

We end up with an orphaned `ResourceHolder` with pending transaction commits.

Don't bind the resource holder if synchronization is not active.

However, the proper solution, if users want to synchronize the rabbit transaction
with the global transaction, is to use the `BEFORE_COMMIT` phase.

See the discussion on the Github issue for more information.

**cherry-pick to 2.2.x**
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Mar 15, 2021
Resolves spring-projects#1309

If the `RabbitTemplate` was called from a `@TransactionalEventListener`,
with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations
are already cleared. It is too late to synchronize this transaction.

We end up with an orphaned `ResourceHolder` with pending transaction commits.

Don't bind the resource holder if synchronization is not active.

However, the proper solution, if users want to synchronize the rabbit transaction
with the global transaction, is to use the `BEFORE_COMMIT` phase.

See the discussion on the Github issue for more information.

**cherry-pick to 2.2.x**
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Mar 15, 2021
Resolves spring-projects#1309

If the `RabbitTemplate` was called from a `@TransactionalEventListener`,
with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations
are already cleared. It is too late to synchronize this transaction.

We end up with an orphaned `ResourceHolder` with pending transaction commits.

Don't bind the resource holder if synchronization is not active.

However, the proper solution, if users want to synchronize the rabbit transaction
with the global transaction, is to use the `BEFORE_COMMIT` phase.

See the discussion on the Github issue for more information.

**cherry-pick to 2.2.x**
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Mar 15, 2021
Resolves spring-projects#1309

If the `RabbitTemplate` was called from a `@TransactionalEventListener`,
with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations
are already cleared. It is too late to synchronize this transaction.

We end up with an orphaned `ResourceHolder` with pending transaction commits.

Don't bind the resource holder if synchronization is not active.

However, the proper solution, if users want to synchronize the rabbit transaction
with the global transaction, is to use the `BEFORE_COMMIT` phase.

See the discussion on the Github issue for more information.

**cherry-pick to 2.2.x**
@kuldeepkalassts
Copy link
Author

kuldeepkalassts commented Mar 16, 2021

I can fix the ConnectionFactoryUtils to not bind an orphaned ResourceHolder, but then the local transaction will commit, even if the JPA transaction later rolls back, which is not your desired result.

I Agree with this, and your fix will ensure no orphan resource will be added due to TransactionSynchronizationManager.isSynchronizationActive()==false

But we came up with another solution :

@Override
	@Transactional
	public EmpDTO createEmployee(EmpDTO empDTO) {

		// Record data into a database
		empRepository.save(empDTO);	
		
		// Sending event, this will send the message.
		//eventPublisher.publishEvent(new ActivityEvent(empDTO));
		
		TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
			@Override
			public void afterCommit() {
				rabbitMQSender.convertAndSend(empDTO);
			}
		});
		
		return createResponse(empDTO);
	}

So now we are not going to use eventPublisher.publishEvent, but we will create a new TransactionSynchronization.

this new TransactionSynchronization will call the convertAndSend in afterCommit() , so we will still be getting isSynchronizationActive = true . So message will be sent and commit without any issue.

artembilan pushed a commit that referenced this issue Mar 16, 2021
Resolves #1309

If the `RabbitTemplate` was called from a `@TransactionalEventListener`,
with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations
are already cleared. It is too late to synchronize this transaction.

We end up with an orphaned `ResourceHolder` with pending transaction commits.

Don't bind the resource holder if synchronization is not active.

However, the proper solution, if users want to synchronize the rabbit transaction
with the global transaction, is to use the `BEFORE_COMMIT` phase.

See the discussion on the Github issue for more information.

**cherry-pick to 2.2.x**

* Fix since version.

* Reset physical close required flag left over from another test.

* Capture test results for all modules.
artembilan pushed a commit that referenced this issue Mar 16, 2021
Resolves #1309

If the `RabbitTemplate` was called from a `@TransactionalEventListener`,
with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations
are already cleared. It is too late to synchronize this transaction.

We end up with an orphaned `ResourceHolder` with pending transaction commits.

Don't bind the resource holder if synchronization is not active.

However, the proper solution, if users want to synchronize the rabbit transaction
with the global transaction, is to use the `BEFORE_COMMIT` phase.

See the discussion on the Github issue for more information.

**cherry-pick to 2.2.x**

* Fix since version.

* Reset physical close required flag left over from another test.

* Capture test results for all modules.
# Conflicts:
#	.github/workflows/pr-build-workflow.yml
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/ClientRecoveryCompatibilityTests.java
@kuldeepkalassts
Copy link
Author

Thanks for the fix @garyrussell and @artembilan, If it is possible, can you help me to get it merged on v2.2.1?

@garyrussell
Copy link
Contributor

@iav20
Copy link

iav20 commented Mar 20, 2021

Hi Gary,
I am from the same team as Kuldeep. Just wanted to inform that the current Spring jar version in our production is 2.2.1.
Will it be possible for the current changes to be made available in 2.2.1 version as it will save a lot of regression and effort which will be introduced by moving to a newer version of binary.
Or Alternatively if you cannot provide changes in 2.2.1 version , can you please guide us to make the change at our end in our production deployed jar ?
Regards, Apoorv

@garyrussell
Copy link
Contributor

@iav20 It doesn't work that way. When a bug is found in, say, 2.2.1, it is fixed in the next patch version, 2.2.2.

The current 2.2.x version is 2.2.16.RELEASE.

guide us to make the change at our end in our production deployed jar

That is not possible.

You have to upgrade to 2.2.16.

2.2.1 was released well over a year ago; you must keep your versions up-to-date to get the latest fixes and updates. Using old software versions is generally not a good idea.

Installing patch releases is much different to upgrading to, say, 2.3.x, when you would need to do much more testing. Changes in the 2.2.x line are generally just bug fixes.

@iav20
Copy link

iav20 commented Apr 5, 2021

Hi Gary , So we need to take spring-core.2.2.16 jar and spring-rabbit.2.2.16 jar version for this change ?
Regards, Apoorv

@garyrussell
Copy link
Contributor

spring-amqp 2.2.16.RELEASE; spring-rabbit 2.2.16.RELEASE; spring-core (and all spring framework dependencies) 5.2.13.RELEASE.

When using maven/gradle, It's best to just import spring-rabbit and let it bring in all the correct dependency versions.

@iav20
Copy link

iav20 commented Apr 5, 2021

Hi Gary, We are currently using spring family of jars (version 5.2.1) throughout our codebase. Upgrading all spring family of jars is again a major effort
So just wanted to understand, If we just upgrade to spring-amqp 2.2.16.RELEASE; spring-rabbit 2.2.16.RELEASE and keep remaining spring family of jars to their current version(5.2.1) , is there any impact?
If there is any impact then please let us know as it will help us scope this binary version upgrade work effort wise.
Best Regards,Apoorv

@garyrussell
Copy link
Contributor

I am not aware of any issues you might have but, once again, you should really keep up-to-date on versions, upgrading spring jars within the 5.2.x line should not cause you any issues, they only contain bug fixes; the latest 5.2.x is 5.2.13 and that is the version we test spring-rabbit 2.2.16 against.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants