From 9c16c8865b617f01245d61a042ad5873a8c89cf1 Mon Sep 17 00:00:00 2001 From: guqing Date: Sun, 4 Feb 2024 17:04:12 +0800 Subject: [PATCH 1/3] fix: failure in data query due to reconciler triggered by uncommitted transaction --- .../halo/app/config/HaloConfiguration.java | 7 +++++ .../ReactiveExtensionClientImpl.java | 26 +++++++++++++------ .../ReactiveExtensionClientTest.java | 6 +++++ 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/run/halo/app/config/HaloConfiguration.java b/application/src/main/java/run/halo/app/config/HaloConfiguration.java index ef7f8a2bca..26dbda24a3 100644 --- a/application/src/main/java/run/halo/app/config/HaloConfiguration.java +++ b/application/src/main/java/run/halo/app/config/HaloConfiguration.java @@ -6,6 +6,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.transaction.ReactiveTransactionManager; +import org.springframework.transaction.reactive.TransactionalOperator; @Configuration(proxyBeanMethods = false) @EnableAsync @@ -18,4 +20,9 @@ Jackson2ObjectMapperBuilderCustomizer objectMapperCustomizer() { builder.featuresToEnable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS); }; } + + @Bean + TransactionalOperator transactionTemplate(ReactiveTransactionManager transactionManager) { + return TransactionalOperator.create(transactionManager); + } } \ No newline at end of file diff --git a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 2bbe08ad0b..2e13e1c5e6 100644 --- a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -26,7 +26,7 @@ import org.springframework.data.domain.Sort; import org.springframework.data.util.Predicates; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -57,6 +57,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final IndexedQueryEngine indexedQueryEngine; + private final TransactionalOperator transactionalOperator; + private final ConcurrentMap indexBuildingState = new ConcurrentHashMap<>(); @@ -151,7 +153,6 @@ private Mono get(GroupVersionKind gvk, String name) { } @Override - @Transactional public Mono create(E extension) { checkClientWritable(extension); return Mono.just(extension) @@ -176,6 +177,9 @@ public Mono create(E extension) { .map(converter::convertTo) .flatMap(extStore -> doCreate(extension, extStore.getName(), extStore.getData()) .doOnNext(watchers::onAdd) + .doOnNext(e -> { + System.out.println(e); + }) ) .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) // retry when generateName is set @@ -185,7 +189,6 @@ && hasText(extension.getMetadata().getGenerateName())) } @Override - @Transactional public Mono update(E extension) { checkClientWritable(extension); // Refactor the atomic reference if we have a better solution. @@ -223,7 +226,6 @@ private Mono getLatest(Extension extension) { } @Override - @Transactional public Mono delete(E extension) { checkClientWritable(extension); // set deletionTimestamp @@ -246,8 +248,12 @@ Mono doCreate(E oldExtension, String name, byte[] data) var type = (Class) oldExtension.getClass(); var indexer = indexerFactory.getIndexer(gvk); return client.create(name, data) - .map(created -> converter.convertFrom(type, created)) - .doOnNext(indexer::indexRecord); + .map(created -> { + E result = converter.convertFrom(type, created); + indexer.indexRecord(result); + return result; + }) + .as(transactionalOperator::transactional); }); } @@ -257,8 +263,12 @@ Mono doUpdate(E oldExtension, String name, Long version var type = (Class) oldExtension.getClass(); var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind()); return client.update(name, version, data) - .map(updated -> converter.convertFrom(type, updated)) - .doOnNext(indexer::updateRecord); + .map(updated -> { + E result = converter.convertFrom(type, updated); + indexer.updateRecord(result); + return result; + }) + .as(transactionalOperator::transactional); }); } diff --git a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java index dc7cd9528b..66f5f65139 100644 --- a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java +++ b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java @@ -36,6 +36,7 @@ import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -63,6 +64,9 @@ class ReactiveExtensionClientTest { @Mock IndexerFactory indexerFactory; + @Mock + TransactionalOperator transactionalOperator; + @Spy ObjectMapper objectMapper = JsonMapper.builder() .addModule(new JavaTimeModule()) @@ -76,6 +80,8 @@ void setUp() { lenient().when(schemeManager.get(eq(FakeExtension.class))) .thenReturn(fakeScheme); lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme); + lenient().when(transactionalOperator.transactional(any(Mono.class))) + .thenAnswer(invocation -> invocation.getArgument(0)); } FakeExtension createFakeExtension(String name, Long version) { From d7a976d76e84a4f914594883d7fd08761b14c87b Mon Sep 17 00:00:00 2001 From: guqing Date: Mon, 5 Feb 2024 10:26:12 +0800 Subject: [PATCH 2/3] chore: remove unused code for testing --- .../extension/ReactiveExtensionClientImpl.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 2e13e1c5e6..3456511981 100644 --- a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -177,9 +177,6 @@ public Mono create(E extension) { .map(converter::convertTo) .flatMap(extStore -> doCreate(extension, extStore.getName(), extStore.getData()) .doOnNext(watchers::onAdd) - .doOnNext(e -> { - System.out.println(e); - }) ) .retryWhen(Retry.backoff(3, Duration.ofMillis(100)) // retry when generateName is set @@ -248,11 +245,8 @@ Mono doCreate(E oldExtension, String name, byte[] data) var type = (Class) oldExtension.getClass(); var indexer = indexerFactory.getIndexer(gvk); return client.create(name, data) - .map(created -> { - E result = converter.convertFrom(type, created); - indexer.indexRecord(result); - return result; - }) + .map(created -> converter.convertFrom(type, created)) + .doOnNext(indexer::indexRecord) .as(transactionalOperator::transactional); }); } @@ -263,11 +257,8 @@ Mono doUpdate(E oldExtension, String name, Long version var type = (Class) oldExtension.getClass(); var indexer = indexerFactory.getIndexer(oldExtension.groupVersionKind()); return client.update(name, version, data) - .map(updated -> { - E result = converter.convertFrom(type, updated); - indexer.updateRecord(result); - return result; - }) + .map(updated -> converter.convertFrom(type, updated)) + .doOnNext(indexer::updateRecord) .as(transactionalOperator::transactional); }); } From 7ecd23a2bb0f4e0a62ecb89fdfe4015ce8645d82 Mon Sep 17 00:00:00 2001 From: guqing Date: Mon, 5 Feb 2024 10:59:06 +0800 Subject: [PATCH 3/3] refactor: transactional operator --- .../halo/app/config/HaloConfiguration.java | 7 ----- .../ReactiveExtensionClientImpl.java | 26 ++++++++++++++++--- .../ReactiveExtensionClientTest.java | 5 +++- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/run/halo/app/config/HaloConfiguration.java b/application/src/main/java/run/halo/app/config/HaloConfiguration.java index 26dbda24a3..ef7f8a2bca 100644 --- a/application/src/main/java/run/halo/app/config/HaloConfiguration.java +++ b/application/src/main/java/run/halo/app/config/HaloConfiguration.java @@ -6,8 +6,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.reactive.TransactionalOperator; @Configuration(proxyBeanMethods = false) @EnableAsync @@ -20,9 +18,4 @@ Jackson2ObjectMapperBuilderCustomizer objectMapperCustomizer() { builder.featuresToEnable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS); }; } - - @Bean - TransactionalOperator transactionTemplate(ReactiveTransactionManager transactionManager) { - return TransactionalOperator.create(transactionManager); - } } \ No newline at end of file diff --git a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 3456511981..c821e4491d 100644 --- a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -26,6 +26,7 @@ import org.springframework.data.domain.Sort; import org.springframework.data.util.Predicates; import org.springframework.stereotype.Component; +import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,7 +41,6 @@ @Slf4j @Component -@RequiredArgsConstructor public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final ReactiveExtensionStoreClient client; @@ -57,11 +57,31 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final IndexedQueryEngine indexedQueryEngine; - private final TransactionalOperator transactionalOperator; - private final ConcurrentMap indexBuildingState = new ConcurrentHashMap<>(); + private TransactionalOperator transactionalOperator; + + public ReactiveExtensionClientImpl(ReactiveExtensionStoreClient client, + ExtensionConverter converter, SchemeManager schemeManager, ObjectMapper objectMapper, + IndexerFactory indexerFactory, IndexedQueryEngine indexedQueryEngine, + ReactiveTransactionManager reactiveTransactionManager) { + this.client = client; + this.converter = converter; + this.schemeManager = schemeManager; + this.objectMapper = objectMapper; + this.indexerFactory = indexerFactory; + this.indexedQueryEngine = indexedQueryEngine; + this.transactionalOperator = TransactionalOperator.create(reactiveTransactionManager); + } + + /** + * Only for test. + */ + void setTransactionalOperator(TransactionalOperator transactionalOperator) { + this.transactionalOperator = transactionalOperator; + } + @Override public Flux list(Class type, Predicate predicate, Comparator comparator) { diff --git a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java index 66f5f65139..c46e37700c 100644 --- a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java +++ b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java @@ -36,6 +36,7 @@ import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.Exceptions; import reactor.core.publisher.Flux; @@ -65,7 +66,7 @@ class ReactiveExtensionClientTest { IndexerFactory indexerFactory; @Mock - TransactionalOperator transactionalOperator; + ReactiveTransactionManager reactiveTransactionManager; @Spy ObjectMapper objectMapper = JsonMapper.builder() @@ -80,6 +81,8 @@ void setUp() { lenient().when(schemeManager.get(eq(FakeExtension.class))) .thenReturn(fakeScheme); lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme); + var transactionalOperator = mock(TransactionalOperator.class); + client.setTransactionalOperator(transactionalOperator); lenient().when(transactionalOperator.transactional(any(Mono.class))) .thenAnswer(invocation -> invocation.getArgument(0)); }