From 6aaf48f242dcaa7f1467ddfc21e7c26320c82355 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 22 Jun 2023 17:13:29 +0200 Subject: [PATCH 1/9] Optimize pool warmup --- .../java/reactor/pool/SimpleDequePool.java | 21 +- .../java/reactor/pool/CommonPoolTest.java | 12 +- .../java/reactor/pool/PoolWarmupTest.java | 227 ++++++++++++++++++ .../src/test/java/reactor/pool/TestUtils.java | 50 ++-- 4 files changed, 274 insertions(+), 36 deletions(-) create mode 100644 reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 38b85f63..3fdfe1d0 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -249,7 +249,8 @@ public Mono warmup() { .returnPermits(1); }); } - return Flux.concat(allWarmups) + // merge will eagerly subscribe to all warmups from the current thread + return Flux.merge(allWarmups.length, allWarmups) .reduce(0, (count, p) -> count + 1); }); } @@ -442,13 +443,14 @@ else if (sig.isOnError()) { logger.debug("should warm up {} extra resources", toWarmup); final long startWarmupIteration = clock.millis(); - Flux warmupFlux = Flux.range(1, toWarmup) + Flux warmupFlux = Flux.range(1, toWarmup) //individual warmup failures decrement the permit and are logged .flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator)); - primary.onErrorResume(e -> Mono.empty()) - .thenMany(warmupFlux) - .subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); + // merge will eagerly subscribe to the primary and to all warmupFlux from the current thread. + Flux.merge(toWarmup + 1, primary, warmupFlux) + .onErrorResume(e -> Mono.empty()) + .subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain()); } } } @@ -469,7 +471,7 @@ private Mono allocatorWithScheduler() { return poolConfig.allocator(); } - Mono warmupMono(int index, int max, long startWarmupIteration, Mono allocator) { + Mono warmupMono(int index, int max, long startWarmupIteration, Mono allocator) { return allocator.flatMap(poolable -> { logger.debug("warmed up extra resource {}/{}", index, max); metricsRecorder.recordAllocationSuccessAndLatency( @@ -479,9 +481,10 @@ Mono warmupMono(int index, int max, long startWarmupIteration, Mono tempRef = createSlot(poolable); tempRef.markDestroy(); - return destroyPoolable(tempRef); + return destroyPoolable(tempRef) + .then(Mono.empty()); } - return Mono.empty(); + return Mono.just(poolable); }).onErrorResume(warmupError -> { logger.debug("failed to warm up extra resource {}/{}: {}", index, max, warmupError.toString()); diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index 185b26fc..b7f86abe 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1401,10 +1401,18 @@ void recordsAllocationLatenciesInWarmup(PoolStyle configAdjuster) { .clock(recorder.getClock()); AbstractPool pool = configAdjuster.apply(builder); + // warmup will eagerly subscribe 10 times to the allocator. + // The five first subscribtions will success (after around 100 millis), and some allocation should fail after around + // 200 millis. assertThatIllegalStateException() .isThrownBy(() -> pool.warmup().block()); - assertThat(recorder.getAllocationTotalCount()).isEqualTo(2); + // at least 5 allocation should be successful + assertThat(recorder.getAllocationSuccessCount()).isEqualTo(5); + // at least 1 allocation should have failed + assertThat(recorder.getAllocationErrorCount()).isGreaterThanOrEqualTo(1); + // at least 6 allocations should have taken place + assertThat(recorder.getAllocationTotalCount()).isGreaterThanOrEqualTo(6); long minSuccess = recorder.getAllocationSuccessHistogram().getMinValue(); long minError = recorder.getAllocationErrorHistogram().getMinValue(); diff --git a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java new file mode 100644 index 00000000..fc415087 --- /dev/null +++ b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.pool; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.pool.TestUtils.ParameterizedTestWithName; +import reactor.util.Logger; +import reactor.util.Loggers; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * This test attempts to reproduce a problem reported in + * https://github.com/reactor/reactor-netty/issues/2781. + * + *

+ * More specifically, this test is reproducing the sample scenario described here: + * https://github.com/r2dbc/r2dbc-pool/issues/190#issuecomment-1520166774": + * during warmup, some or all DBConnections may use the same TcpResource EventLoop thread + * (if minResources == maxResources). + * + *

+ * In a nutshell, the tested scenario is the following: + * + *

    + *
  • we have a DBConnectionPool that is internally using an InstrumentedPool for reactive DBConnection pooling.
  • + *
  • A DBConnection, once acquired, allows to simulate the send of an SQL request (findAll)
  • + *
  • like in reactor-netty and r2dbc, when a DBConnection is created, it will either use + * a dedicate thread that will be used to send SQL requests on the DBConnection, unless the current thread is already + * a DBConnection thread. In this case, the current DBConnection thread will be used: this is similar to + * "colocated" TcpRespource EventLoops in reactor-netty.
  • + *
+ * + * @author Pierre De Rop + */ +public class PoolWarmupTest { + /** + * The test will be run twice: with InstrumentedPool warmups, and without it. + * If warmup is not done, then the warmup will be internally done + * lazily when the first acquire is taking place because the allocation strategy is + * configured with minResources == maxResources. + */ + static List warmupOptions() { return Arrays.asList(Boolean.FALSE, Boolean.TRUE); } + + static final Logger LOGGER = Loggers.getLogger(PoolWarmupTest.class); + + /** + * Each DBConnection will use one of the following DBConnection Executor + */ + final static class DBConnectionThread implements Executor { + final static ThreadLocal current = ThreadLocal.withInitial(() -> null); + + final ExecutorService dbThread; + + DBConnectionThread(String name) { + dbThread = Executors.newSingleThreadExecutor(r -> new Thread(() -> { + current.set(DBConnectionThread.this); + r.run(); + }, name)); + } + + void stop() { + dbThread.shutdown(); + try { + if (! dbThread.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IllegalStateException("Could not stop db thread timely."); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void execute(Runnable command) { + dbThread.execute(command); + } + } + + /** + * A DBConnection simulates an SQL "findAll" request, which is executed + * through a DBConnectionThread executor. + */ + final static class DBConnection { + final DBConnectionThread dbThread; + + public DBConnection(DBConnectionThread dbThread) { + this.dbThread = dbThread; + } + + Flux findAll() { + return Flux.create(sink -> sink.onRequest(value -> { + if (dbThread == null) { + // publish from the current thread, which is assumed to be a DBConnectionThread. + doPublish(sink); + } else { + // publish through our db thread executor + dbThread.execute(() -> doPublish(sink)); + } + })); + } + + public void doPublish(FluxSink sink) { + for (int i = 0; i < 1000; i ++) { + try { + Thread.sleep(1); + } catch (InterruptedException ignored) {} + sink.next("table entry - " + i); + } + sink.complete(); + } + } + + /** + * A DBConnection Pool, based on Reactor-Pool "InstrumentedPool", configured with minResources == maxResources + */ + final static class DBConnectionPool { + final int poolSize; + final DBConnectionThread[] dbThreads; + final InstrumentedPool pool; + final static AtomicInteger roundRobin = new AtomicInteger(); + + DBConnectionPool(int poolSize) { + this.poolSize = poolSize; + this.dbThreads = new DBConnectionThread[poolSize]; + IntStream.range(0, poolSize).forEach(i -> dbThreads[i] = new DBConnectionThread("dbthread-" + i)); + + pool = PoolBuilder + .from(Mono.defer(() -> { + // if the current thread is already one of our DB thread, then DBConnection.findAll will use + // this current thread, else, let's select one in a round-robin way. + DBConnectionThread dbThread = DBConnectionThread.current.get(); + dbThread = dbThread == null ? + dbThreads[(roundRobin.incrementAndGet() & 0x7F_FF_FF_FF) % dbThreads.length] : dbThread; + return Mono.just(new DBConnection(dbThread)) + .delayElement(Duration.ofMillis(10)) // simulate Database handshaking (authentication, etc ...) + .publishOn(Schedulers.fromExecutor(dbThread)); + })) + .sizeBetween(10, 10) + .idleResourceReuseOrder(false) + .buildPool(); + } + + InstrumentedPool getPool() { + return pool; + } + + void stop() { + Stream.of(dbThreads).forEach(DBConnectionThread::stop); + } + } + + @ParameterizedTestWithName + @MethodSource("warmupOptions") + void testReactorNetty_2781(boolean doWarmup) { + int poolSize = 10; + DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize); + + try { + InstrumentedPool pool = dbConnectionPool.getPool(); + if (doWarmup) { + pool.warmup().block(); + } + + long startTime = System.currentTimeMillis(); + + List> fluxes = IntStream.rangeClosed(1, poolSize) + .mapToObj(i -> Flux.from(pool.withPoolable(DBConnection::findAll) + .doOnComplete(() -> LOGGER.info(": " + i + "-findAll done")))) + .collect(Collectors.toList()); + + List> next = new ArrayList<>(); + for (Flux flux : fluxes) { + next.add(flux.count().doOnNext(number -> LOGGER.info("num:" + number))); + } + + Flux.fromIterable(next) + .flatMap(x -> x) // will prefetch all fluxes + .collectList() + .block(Duration.ofSeconds(60)); + + long elapsed = (System.currentTimeMillis() - startTime); + LOGGER.info("Elapsed time: " + elapsed); + + // each "dbConnection.findAll()" should take around 1000 millis, we have + // 10 subscriptions, but we expect subscriptions to be served concurrently using + // our 10 DBConnections threads from the pool ... So the elapsed time should be around 1000 millis, not 10 000 millis ! + assertThat(elapsed).isCloseTo(1000, Offset.offset(3000L)); + } + + finally { + dbConnectionPool.stop(); + } + } +} + diff --git a/reactor-pool/src/test/java/reactor/pool/TestUtils.java b/reactor-pool/src/test/java/reactor/pool/TestUtils.java index 42ff9cab..38bdb286 100644 --- a/reactor-pool/src/test/java/reactor/pool/TestUtils.java +++ b/reactor-pool/src/test/java/reactor/pool/TestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -192,110 +192,110 @@ public Clock getClock() { * @param startTimeMillis the starting time, as measured by {@link #getClock() the Clock's} {@link Clock#millis() millis()} * @return the elapsed time */ - public long measureTime(long startTimeMillis) { + public synchronized long measureTime(long startTimeMillis) { final long l = clock.millis() - startTimeMillis; if (l <= 0) return 1; return l; } @Override - public void recordAllocationSuccessAndLatency(long latencyMs) { + public synchronized void recordAllocationSuccessAndLatency(long latencyMs) { allocationSuccessHistogram.recordValue(latencyMs); } @Override - public void recordAllocationFailureAndLatency(long latencyMs) { + public synchronized void recordAllocationFailureAndLatency(long latencyMs) { allocationErrorHistogram.recordValue(latencyMs); } @Override - public void recordResetLatency(long latencyMs) { + public synchronized void recordResetLatency(long latencyMs) { resetHistogram.recordValue(latencyMs); } @Override - public void recordDestroyLatency(long latencyMs) { + public synchronized void recordDestroyLatency(long latencyMs) { destroyHistogram.recordValue(latencyMs); } @Override - public void recordRecycled() { + public synchronized void recordRecycled() { recycledCounter.increment(); } @Override - public void recordLifetimeDuration(long millisecondsSinceAllocation) { + public synchronized void recordLifetimeDuration(long millisecondsSinceAllocation) { this.lifetimeHistogram.recordValue(millisecondsSinceAllocation); } @Override - public void recordIdleTime(long millisecondsIdle) { + public synchronized void recordIdleTime(long millisecondsIdle) { this.idleTimeHistogram.recordValue(millisecondsIdle); } @Override - public void recordSlowPath() { + public synchronized void recordSlowPath() { this.slowPathCounter.increment(); } @Override - public void recordFastPath() { + public synchronized void recordFastPath() { this.fastPathCounter.increment(); } - public long getAllocationTotalCount() { + public synchronized long getAllocationTotalCount() { return allocationSuccessHistogram.getTotalCount() + allocationErrorHistogram.getTotalCount(); } - public long getAllocationSuccessCount() { + public synchronized long getAllocationSuccessCount() { return allocationSuccessHistogram.getTotalCount(); } - public long getAllocationErrorCount() { + public synchronized long getAllocationErrorCount() { return allocationErrorHistogram.getTotalCount(); } - public long getResetCount() { + public synchronized long getResetCount() { return resetHistogram.getTotalCount(); } - public long getDestroyCount() { + public synchronized long getDestroyCount() { return destroyHistogram.getTotalCount(); } - public long getRecycledCount() { + public synchronized long getRecycledCount() { return recycledCounter.sum(); } - public ShortCountsHistogram getAllocationSuccessHistogram() { + public synchronized ShortCountsHistogram getAllocationSuccessHistogram() { return allocationSuccessHistogram; } - public ShortCountsHistogram getAllocationErrorHistogram() { + public synchronized ShortCountsHistogram getAllocationErrorHistogram() { return allocationErrorHistogram; } - public ShortCountsHistogram getResetHistogram() { + public synchronized ShortCountsHistogram getResetHistogram() { return resetHistogram; } - public ShortCountsHistogram getDestroyHistogram() { + public synchronized ShortCountsHistogram getDestroyHistogram() { return destroyHistogram; } - public Histogram getLifetimeHistogram() { + public synchronized Histogram getLifetimeHistogram() { return lifetimeHistogram; } - public Histogram getIdleTimeHistogram() { + public synchronized Histogram getIdleTimeHistogram() { return idleTimeHistogram; } - public long getFastPathCount() { + public synchronized long getFastPathCount() { return fastPathCounter.sum(); } - public long getSlowPathCount() { + public synchronized long getSlowPathCount() { return slowPathCounter.sum(); } } From dea28bab21a0c8bd40f4c1eec0016d1a6acb22d1 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 22 Jun 2023 19:33:20 +0200 Subject: [PATCH 2/9] Ensure borrowers are always scheduled throw the acquisition scheduler, if any are configured. --- reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 3fdfe1d0..a9c253f0 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -419,7 +419,8 @@ private void drainLoop() { assert newInstance != null; ACQUIRED.incrementAndGet(this); metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); - borrower.deliver(createSlot(newInstance)); + poolConfig.acquisitionScheduler() + .schedule(() -> borrower.deliver(createSlot(newInstance))); } else if (sig.isOnError()) { Throwable error = sig.getThrowable(); From 17e02262cc991d5627d65c23489000f3e130d358 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 22 Jun 2023 23:28:50 +0200 Subject: [PATCH 3/9] Polish the PoolWarmupTest --- .../java/reactor/pool/PoolWarmupTest.java | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java index fc415087..ed8b7d5c 100644 --- a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java +++ b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java @@ -26,6 +26,7 @@ import reactor.util.Logger; import reactor.util.Loggers; +import java.io.Closeable; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -112,7 +113,7 @@ public void execute(Runnable command) { * A DBConnection simulates an SQL "findAll" request, which is executed * through a DBConnectionThread executor. */ - final static class DBConnection { + final static class DBConnection implements Closeable { final DBConnectionThread dbThread; public DBConnection(DBConnectionThread dbThread) { @@ -120,26 +121,26 @@ public DBConnection(DBConnectionThread dbThread) { } Flux findAll() { - return Flux.create(sink -> sink.onRequest(value -> { - if (dbThread == null) { - // publish from the current thread, which is assumed to be a DBConnectionThread. - doPublish(sink); - } else { - // publish through our db thread executor - dbThread.execute(() -> doPublish(sink)); - } - })); + return Flux.create(sink -> sink.onRequest(value -> dbThread.execute(() -> doPublish(sink)))); } - public void doPublish(FluxSink sink) { - for (int i = 0; i < 1000; i ++) { - try { - Thread.sleep(1); - } catch (InterruptedException ignored) {} - sink.next("table entry - " + i); - } + void doPublish(FluxSink sink) { + IntStream.range(0, 1000) + .peek(i -> sleep(1L)) + .forEach(value -> sink.next("table entry - " + value)); sink.complete(); } + + void sleep(long millis) { + try { + Thread.sleep(1); + } catch (InterruptedException ignored) {} + } + + @Override + public void close() { + dbThread.stop(); + } } /** @@ -177,13 +178,14 @@ InstrumentedPool getPool() { } void stop() { + pool.disposeLater().block(Duration.ofSeconds(30)); Stream.of(dbThreads).forEach(DBConnectionThread::stop); } } @ParameterizedTestWithName @MethodSource("warmupOptions") - void testReactorNetty_2781(boolean doWarmup) { + void warmupTest(boolean doWarmup) { int poolSize = 10; DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize); @@ -206,7 +208,7 @@ void testReactorNetty_2781(boolean doWarmup) { } Flux.fromIterable(next) - .flatMap(x -> x) // will prefetch all fluxes + .flatMap(x -> x) .collectList() .block(Duration.ofSeconds(60)); From 4f6f4fea016b180359fae06fd2d80e448206d88f Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Thu, 22 Jun 2023 23:59:04 +0200 Subject: [PATCH 4/9] Last polishing applied to PoolWarmupTest --- .../java/reactor/pool/PoolWarmupTest.java | 27 ++++--------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java index ed8b7d5c..78de331c 100644 --- a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java +++ b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java @@ -19,14 +19,12 @@ import org.assertj.core.data.Offset; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.pool.TestUtils.ParameterizedTestWithName; import reactor.util.Logger; import reactor.util.Loggers; -import java.io.Closeable; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -113,7 +111,7 @@ public void execute(Runnable command) { * A DBConnection simulates an SQL "findAll" request, which is executed * through a DBConnectionThread executor. */ - final static class DBConnection implements Closeable { + final static class DBConnection { final DBConnectionThread dbThread; public DBConnection(DBConnectionThread dbThread) { @@ -121,25 +119,10 @@ public DBConnection(DBConnectionThread dbThread) { } Flux findAll() { - return Flux.create(sink -> sink.onRequest(value -> dbThread.execute(() -> doPublish(sink)))); - } - - void doPublish(FluxSink sink) { - IntStream.range(0, 1000) - .peek(i -> sleep(1L)) - .forEach(value -> sink.next("table entry - " + value)); - sink.complete(); - } - - void sleep(long millis) { - try { - Thread.sleep(1); - } catch (InterruptedException ignored) {} - } - - @Override - public void close() { - dbThread.stop(); + return Flux.range(0, 1000) + .map(integer -> "table entry -" + integer) + .delayElements(Duration.ofMillis(1L)) + .publishOn(Schedulers.fromExecutor(dbThread)); } } From 3ce0667f060c6cb1199b7c39417100d6178d284d Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Fri, 23 Jun 2023 10:02:50 +0200 Subject: [PATCH 5/9] Lazy warmup must use mergeSequential to ensure that primary is completed first. --- reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index a9c253f0..a7804715 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -448,8 +448,9 @@ else if (sig.isOnError()) { //individual warmup failures decrement the permit and are logged .flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator)); - // merge will eagerly subscribe to the primary and to all warmupFlux from the current thread. - Flux.merge(toWarmup + 1, primary, warmupFlux) + // mergeSequential will eagerly subscribe to the primary and to all warmupFlux from the current thread. + // The first completed source will be the primary, then the warmupFlux sources. + Flux.mergeSequential(toWarmup + 1, primary, warmupFlux) .onErrorResume(e -> Mono.empty()) .subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain()); } From e484c2617ac3af77c2de9d91763d2a7098cbe7d3 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Tue, 4 Jul 2023 19:36:19 +0200 Subject: [PATCH 6/9] - reverted delivery of borrower through the acquisition scheduler: this can be done in a separate PR. - resources allocated during warmup are subscribed to eagerly, but now you can configure the level of concurrency using `PoolBuilder.warmupConcurrency(int concurrency)` - improved the warpmupTest --- .../java/reactor/pool/DefaultPoolConfig.java | 40 ++- .../main/java/reactor/pool/PoolBuilder.java | 29 +- .../main/java/reactor/pool/PoolConfig.java | 15 +- .../java/reactor/pool/SimpleDequePool.java | 22 +- .../java/reactor/pool/PoolWarmupTest.java | 301 +++++++++--------- 5 files changed, 244 insertions(+), 163 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java index eebbf04d..47ee54e2 100644 --- a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; /** * A default {@link PoolConfig} that can be extended to bear more configuration options @@ -50,6 +49,7 @@ public class DefaultPoolConfig implements PoolConfig { protected final PoolMetricsRecorder metricsRecorder; protected final Clock clock; protected final boolean isIdleLRU; + protected final int warmupConcurrency; public DefaultPoolConfig(Mono allocator, AllocationStrategy allocationStrategy, @@ -63,7 +63,8 @@ public DefaultPoolConfig(Mono allocator, Scheduler acquisitionScheduler, PoolMetricsRecorder metricsRecorder, Clock clock, - boolean isIdleLRU) { + boolean isIdleLRU, + int warmupConcurrency) { this.pendingAcquireTimer = pendingAcquireTimer; this.allocator = allocator; this.allocationStrategy = allocationStrategy; @@ -77,6 +78,32 @@ public DefaultPoolConfig(Mono allocator, this.metricsRecorder = metricsRecorder; this.clock = clock; this.isIdleLRU = isIdleLRU; + this.warmupConcurrency = warmupConcurrency; + } + + /** + * @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, int) other constructor} + * with explicit setting of warmupConcurrency, to be removed in 1.0.2 at the earliest. + * @since 1.0.1 + */ + @Deprecated + public DefaultPoolConfig(Mono allocator, + AllocationStrategy allocationStrategy, + int maxPending, + BiFunction pendingAcquireTimer, + Function> releaseHandler, + Function> destroyHandler, + BiPredicate evictionPredicate, + Duration evictInBackgroundInterval, + Scheduler evictInBackgroundScheduler, + Scheduler acquisitionScheduler, + PoolMetricsRecorder metricsRecorder, + Clock clock, + boolean isIdleLRU) { + this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler, + destroyHandler, evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler, + acquisitionScheduler, metricsRecorder, clock, isIdleLRU, + PoolBuilder.DEFAULT_WARMUP_CONCURRENCY); } /** @@ -101,6 +128,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopyDpc.metricsRecorder; this.clock = toCopyDpc.clock; this.isIdleLRU = toCopyDpc.isIdleLRU; + this.warmupConcurrency = toCopyDpc.warmupConcurrency; } else { this.allocator = toCopy.allocator(); @@ -116,6 +144,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopy.metricsRecorder(); this.clock = toCopy.clock(); this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder(); + this.warmupConcurrency = toCopy.warmupConcurrency(); } } @@ -139,6 +168,11 @@ public BiFunction pendingAcquireTimer() { return this.pendingAcquireTimer; } + @Override + public int warmupConcurrency() { + return this.warmupConcurrency; + } + @Override public Function> releaseHandler() { return this.releaseHandler; diff --git a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java index e254f9cd..257bd8e5 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,6 +78,7 @@ public static PoolBuilder> from(Publisher allo PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE; boolean idleLruOrder = true; BiFunction pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER; + int warmupConcurrency = DEFAULT_WARMUP_CONCURRENCY; PoolBuilder(Mono allocator, Function, CONF> configModifier) { this.allocator = allocator; @@ -430,6 +431,27 @@ public > PoolBuilder extraConfiguration(Fu return new PoolBuilder<>(this, this.configModifier.andThen(configModifier)); } + /** + * Specifies the concurrency level used when the allocator is subscribed to during the warmup phase. + * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are + * subscribed to at the same time. + * A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently. + * A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being + * subscribed to from the current thread. + * By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the + * highest possible concurrency level. + * + * @param warmupConcurrency The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive, + * {@code Integer.MAX_VALUE} by default + */ + public PoolBuilder warmupConcurrency(int warmupConcurrency) { + if (warmupConcurrency < 1) { + throw new IllegalArgumentException("warmupConcurrency must be positive"); + } + this.warmupConcurrency = warmupConcurrency; + return this; + } + /** * Construct a default reactor pool with the builder's configuration. * @@ -479,7 +501,8 @@ CONF buildConfig() { acquisitionScheduler, metricsRecorder, clock, - idleLruOrder); + idleLruOrder, + warmupConcurrency); return this.configModifier.apply(baseConfig); } @@ -501,5 +524,5 @@ static BiPredicate idlePredicate(Duration maxIdleTime) static final Function> NOOP_HANDLER = it -> Mono.empty(); static final BiPredicate NEVER_PREDICATE = (ignored1, ignored2) -> false; static final BiFunction DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS); - + static final int DEFAULT_WARMUP_CONCURRENCY = Integer.MAX_VALUE; } diff --git a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java index 99e38860..0d1cf84e 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -141,4 +141,17 @@ default BiFunction pendingAcquireTimer() { return PoolBuilder.DEFAULT_PENDING_ACQUIRE_TIMER; } + /** + * Specifies the concurrency level used when the allocator is subscribed to during the warmup phase. + * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are + * subscribed to at the same time. + * A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently. + * A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being + * subscribed to from the current thread. + * By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the + * highest possible concurrency level. + */ + default int warmupConcurrency() { + return PoolBuilder.DEFAULT_WARMUP_CONCURRENCY; + } } diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index a7804715..6e25dd68 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -43,6 +43,7 @@ import reactor.core.scheduler.Schedulers; import reactor.util.Loggers; import reactor.util.annotation.Nullable; +import reactor.util.concurrent.Queues; /** * The {@link SimpleDequePool} is based on {@link Deque} for idle resources and pending {@link Pool#acquire()} Monos, @@ -249,8 +250,10 @@ public Mono warmup() { .returnPermits(1); }); } - // merge will eagerly subscribe to all warmups from the current thread - return Flux.merge(allWarmups.length, allWarmups) + // merge will eagerly subscribe to all warmups from the current thread, but + // the concurrency can be controlled from configuration. + int warmupConcurrency = Math.min(allWarmups.length, poolConfig.warmupConcurrency()); + return Flux.merge(Flux.fromArray(allWarmups), warmupConcurrency) .reduce(0, (count, p) -> count + 1); }); } @@ -419,8 +422,7 @@ private void drainLoop() { assert newInstance != null; ACQUIRED.incrementAndGet(this); metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); - poolConfig.acquisitionScheduler() - .schedule(() -> borrower.deliver(createSlot(newInstance))); + borrower.deliver(createSlot(newInstance)); } else if (sig.isOnError()) { Throwable error = sig.getThrowable(); @@ -444,13 +446,13 @@ else if (sig.isOnError()) { logger.debug("should warm up {} extra resources", toWarmup); final long startWarmupIteration = clock.millis(); - Flux warmupFlux = Flux.range(1, toWarmup) - //individual warmup failures decrement the permit and are logged - .flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator)); + Flux> monos = Flux.range(0, toWarmup + 1) + .map(n -> (n == 0) ? primary : warmupMono(n, toWarmup, startWarmupIteration, allocator)); - // mergeSequential will eagerly subscribe to the primary and to all warmupFlux from the current thread. - // The first completed source will be the primary, then the warmupFlux sources. - Flux.mergeSequential(toWarmup + 1, primary, warmupFlux) + // merge will eagerly subscribe to the allocator from the current thread, but the concurrency + // can be controlled from configuration + int warmupConcurrency = Math.min(toWarmup + 1, poolConfig.warmupConcurrency()); + Flux.merge(monos, warmupConcurrency, Queues.XS_BUFFER_SIZE) .onErrorResume(e -> Mono.empty()) .subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain()); } diff --git a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java index 78de331c..0d272ab8 100644 --- a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java +++ b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java @@ -16,23 +16,24 @@ package reactor.pool; -import org.assertj.core.data.Offset; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import reactor.pool.TestUtils.ParameterizedTestWithName; import reactor.util.Logger; import reactor.util.Loggers; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -65,148 +66,156 @@ * @author Pierre De Rop */ public class PoolWarmupTest { - /** - * The test will be run twice: with InstrumentedPool warmups, and without it. - * If warmup is not done, then the warmup will be internally done - * lazily when the first acquire is taking place because the allocation strategy is - * configured with minResources == maxResources. - */ - static List warmupOptions() { return Arrays.asList(Boolean.FALSE, Boolean.TRUE); } - - static final Logger LOGGER = Loggers.getLogger(PoolWarmupTest.class); - - /** - * Each DBConnection will use one of the following DBConnection Executor - */ - final static class DBConnectionThread implements Executor { - final static ThreadLocal current = ThreadLocal.withInitial(() -> null); - - final ExecutorService dbThread; - - DBConnectionThread(String name) { - dbThread = Executors.newSingleThreadExecutor(r -> new Thread(() -> { - current.set(DBConnectionThread.this); - r.run(); - }, name)); - } - - void stop() { - dbThread.shutdown(); - try { - if (! dbThread.awaitTermination(30, TimeUnit.SECONDS)) { - throw new IllegalStateException("Could not stop db thread timely."); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public void execute(Runnable command) { - dbThread.execute(command); - } - } - - /** - * A DBConnection simulates an SQL "findAll" request, which is executed - * through a DBConnectionThread executor. - */ - final static class DBConnection { - final DBConnectionThread dbThread; - - public DBConnection(DBConnectionThread dbThread) { - this.dbThread = dbThread; - } - - Flux findAll() { - return Flux.range(0, 1000) - .map(integer -> "table entry -" + integer) - .delayElements(Duration.ofMillis(1L)) - .publishOn(Schedulers.fromExecutor(dbThread)); - } - } - - /** - * A DBConnection Pool, based on Reactor-Pool "InstrumentedPool", configured with minResources == maxResources - */ - final static class DBConnectionPool { - final int poolSize; - final DBConnectionThread[] dbThreads; - final InstrumentedPool pool; - final static AtomicInteger roundRobin = new AtomicInteger(); - - DBConnectionPool(int poolSize) { - this.poolSize = poolSize; - this.dbThreads = new DBConnectionThread[poolSize]; - IntStream.range(0, poolSize).forEach(i -> dbThreads[i] = new DBConnectionThread("dbthread-" + i)); - - pool = PoolBuilder - .from(Mono.defer(() -> { - // if the current thread is already one of our DB thread, then DBConnection.findAll will use - // this current thread, else, let's select one in a round-robin way. - DBConnectionThread dbThread = DBConnectionThread.current.get(); - dbThread = dbThread == null ? - dbThreads[(roundRobin.incrementAndGet() & 0x7F_FF_FF_FF) % dbThreads.length] : dbThread; - return Mono.just(new DBConnection(dbThread)) - .delayElement(Duration.ofMillis(10)) // simulate Database handshaking (authentication, etc ...) - .publishOn(Schedulers.fromExecutor(dbThread)); - })) - .sizeBetween(10, 10) - .idleResourceReuseOrder(false) - .buildPool(); - } - - InstrumentedPool getPool() { - return pool; - } - - void stop() { - pool.disposeLater().block(Duration.ofSeconds(30)); - Stream.of(dbThreads).forEach(DBConnectionThread::stop); - } - } - - @ParameterizedTestWithName - @MethodSource("warmupOptions") - void warmupTest(boolean doWarmup) { - int poolSize = 10; - DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize); - - try { - InstrumentedPool pool = dbConnectionPool.getPool(); - if (doWarmup) { - pool.warmup().block(); - } - - long startTime = System.currentTimeMillis(); - - List> fluxes = IntStream.rangeClosed(1, poolSize) - .mapToObj(i -> Flux.from(pool.withPoolable(DBConnection::findAll) - .doOnComplete(() -> LOGGER.info(": " + i + "-findAll done")))) - .collect(Collectors.toList()); - - List> next = new ArrayList<>(); - for (Flux flux : fluxes) { - next.add(flux.count().doOnNext(number -> LOGGER.info("num:" + number))); - } - - Flux.fromIterable(next) - .flatMap(x -> x) - .collectList() - .block(Duration.ofSeconds(60)); - - long elapsed = (System.currentTimeMillis() - startTime); - LOGGER.info("Elapsed time: " + elapsed); - - // each "dbConnection.findAll()" should take around 1000 millis, we have - // 10 subscriptions, but we expect subscriptions to be served concurrently using - // our 10 DBConnections threads from the pool ... So the elapsed time should be around 1000 millis, not 10 000 millis ! - assertThat(elapsed).isCloseTo(1000, Offset.offset(3000L)); - } - - finally { - dbConnectionPool.stop(); - } - } + static final Logger LOGGER = Loggers.getLogger(PoolWarmupTest.class); + + protected static Stream warmupTestArgs() { + return Stream.of( + Arguments.of(true, 10, Schedulers.immediate()), + Arguments.of(true, 1, Schedulers.single()), + Arguments.of(false, 10, Schedulers.immediate()), + Arguments.of(false, 1, Schedulers.single()) + ); + } + + + /** + * Each DBConnection will use one of the following DBConnection Executor + */ + final static class DBConnectionThread implements Executor { + final static ThreadLocal current = ThreadLocal.withInitial(() -> null); + + final ExecutorService dbThread; + final AtomicBoolean used = new AtomicBoolean(false); + + DBConnectionThread(String name) { + dbThread = Executors.newSingleThreadExecutor(r -> new Thread(() -> { + current.set(DBConnectionThread.this); + r.run(); + }, name)); + } + + void stop() { + dbThread.shutdown(); + try { + if (!dbThread.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IllegalStateException("Could not stop db thread timely."); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void execute(Runnable command) { + used.set(true); + dbThread.execute(command); + } + } + + /** + * A DBConnection simulates an SQL "findAll" request, which is executed + * through a DBConnectionThread executor. + */ + final static class DBConnection { + final DBConnectionThread dbThread; + + public DBConnection(DBConnectionThread dbThread) { + this.dbThread = dbThread; + } + + Flux findAll() { + return Flux.range(0, 1000) + .map(integer -> "table entry -" + integer) + .delayElements(Duration.ofMillis(1L)) + .publishOn(Schedulers.fromExecutor(dbThread)); + } + } + + /** + * A DBConnection Pool, based on Reactor-Pool "InstrumentedPool", configured with minResources == maxResources + */ + final static class DBConnectionPool { + final int poolSize; + final DBConnectionThread[] dbThreads; + final InstrumentedPool pool; + final static AtomicInteger roundRobin = new AtomicInteger(); + + DBConnectionPool(int poolSize, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) { + this.poolSize = poolSize; + this.dbThreads = new DBConnectionThread[poolSize]; + IntStream.range(0, poolSize).forEach(i -> dbThreads[i] = new DBConnectionThread("dbthread-" + i)); + + pool = PoolBuilder + .from(Mono.defer(() -> { + // if the current thread is already one of our DB thread, then DBConnection.findAll will use + // this current thread, else, let's select one in a round-robin way. + DBConnectionThread dbThread = DBConnectionThread.current.get(); + dbThread = dbThread == null ? + dbThreads[(roundRobin.incrementAndGet() & 0x7F_FF_FF_FF) % dbThreads.length] : dbThread; + return Mono.just(new DBConnection(dbThread)) + .doOnSubscribe(subscription -> LOGGER.warn("subscribe")) + .delayElement(Duration.ofMillis(10)) // simulate Database handshaking (authentication, etc ...) + .publishOn(Schedulers.fromExecutor(dbThread)); + }) + .subscribeOn(allocatorSubscribeScheduler)) + .sizeBetween(10, 10) + .idleResourceReuseOrder(false) + .warmupConcurrency(warmupConcurrency) + .buildPool(); + } + + InstrumentedPool getPool() { + return pool; + } + + long dbThreadsUsed() { + return Stream.of(dbThreads) + .filter(dbThread -> dbThread.used.get()) + .count(); + } + + void stop() { + pool.disposeLater().block(Duration.ofSeconds(30)); + Stream.of(dbThreads).forEach(DBConnectionThread::stop); + } + } + + @ParameterizedTest + @MethodSource("warmupTestArgs") + void warmupTest(boolean doWarmup, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) { + int poolSize = 10; + DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, warmupConcurrency, allocatorSubscribeScheduler); + + try { + InstrumentedPool pool = dbConnectionPool.getPool(); + if (doWarmup) { + pool.warmup().block(); + } + + long startTime = System.currentTimeMillis(); + + List> fluxes = IntStream.rangeClosed(1, poolSize) + .mapToObj(i -> Flux.from(pool.withPoolable(DBConnection::findAll) + .doOnComplete(() -> LOGGER.info(": " + i + "-findAll done")))) + .collect(Collectors.toList()); + + List> next = new ArrayList<>(); + for (Flux flux : fluxes) { + next.add(flux.count().doOnNext(number -> LOGGER.info("num:" + number))); + } + + Flux.fromIterable(next) + .flatMap(x -> x) + .collectList() + .block(Duration.ofSeconds(60)); + + long elapsed = (System.currentTimeMillis() - startTime); + LOGGER.info("Elapsed time: " + elapsed + ", concurrency=" + dbConnectionPool.dbThreadsUsed()); + + assertThat(dbConnectionPool.dbThreadsUsed()).isEqualTo(10); + } finally { + dbConnectionPool.stop(); + } + } } From c4a5af8d83d6556d55ee2ee62472450e30d0c7c1 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 5 Jul 2023 11:57:46 +0200 Subject: [PATCH 7/9] Replaced PoolBuilder.warmupConcurrenty(int concurrency) method by PoolBuilder.parallelizeWarmup(boolean). Warmups are not parallel (as before). Reverted CommonPoolTest.recordsAllocationLatenciesInWarmup test, because warmup is like before, by default: no concurrency. Improved PoolWarmupTest. --- .../java/reactor/pool/DefaultPoolConfig.java | 20 +++++----- .../main/java/reactor/pool/PoolBuilder.java | 37 ++++++++++--------- .../main/java/reactor/pool/PoolConfig.java | 27 +++++++++----- .../java/reactor/pool/SimpleDequePool.java | 10 ++--- .../java/reactor/pool/CommonPoolTest.java | 34 +++++++++++++++++ .../java/reactor/pool/PoolWarmupTest.java | 27 +++++++++----- 6 files changed, 103 insertions(+), 52 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java index 47ee54e2..b8962297 100644 --- a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java @@ -49,7 +49,7 @@ public class DefaultPoolConfig implements PoolConfig { protected final PoolMetricsRecorder metricsRecorder; protected final Clock clock; protected final boolean isIdleLRU; - protected final int warmupConcurrency; + protected final boolean parallelizeWarmup; public DefaultPoolConfig(Mono allocator, AllocationStrategy allocationStrategy, @@ -64,7 +64,7 @@ public DefaultPoolConfig(Mono allocator, PoolMetricsRecorder metricsRecorder, Clock clock, boolean isIdleLRU, - int warmupConcurrency) { + boolean parallelizeWarmup) { this.pendingAcquireTimer = pendingAcquireTimer; this.allocator = allocator; this.allocationStrategy = allocationStrategy; @@ -78,12 +78,12 @@ public DefaultPoolConfig(Mono allocator, this.metricsRecorder = metricsRecorder; this.clock = clock; this.isIdleLRU = isIdleLRU; - this.warmupConcurrency = warmupConcurrency; + this.parallelizeWarmup = parallelizeWarmup; } /** - * @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, int) other constructor} - * with explicit setting of warmupConcurrency, to be removed in 1.0.2 at the earliest. + * @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, boolean) other constructor} + * with explicit setting of parallelizeWarmup, to be removed in 1.0.2 at the earliest. * @since 1.0.1 */ @Deprecated @@ -103,7 +103,7 @@ public DefaultPoolConfig(Mono allocator, this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler, destroyHandler, evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler, acquisitionScheduler, metricsRecorder, clock, isIdleLRU, - PoolBuilder.DEFAULT_WARMUP_CONCURRENCY); + PoolBuilder.DEFAULT_PARALLELIZE_WARMUP); } /** @@ -128,7 +128,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopyDpc.metricsRecorder; this.clock = toCopyDpc.clock; this.isIdleLRU = toCopyDpc.isIdleLRU; - this.warmupConcurrency = toCopyDpc.warmupConcurrency; + this.parallelizeWarmup = toCopyDpc.parallelizeWarmup; } else { this.allocator = toCopy.allocator(); @@ -144,7 +144,7 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopy.metricsRecorder(); this.clock = toCopy.clock(); this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder(); - this.warmupConcurrency = toCopy.warmupConcurrency(); + this.parallelizeWarmup = toCopy.parallelizeWarmup(); } } @@ -169,8 +169,8 @@ public BiFunction pendingAcquireTimer() { } @Override - public int warmupConcurrency() { - return this.warmupConcurrency; + public boolean parallelizeWarmup() { + return this.parallelizeWarmup; } @Override diff --git a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java index 257bd8e5..64c9f27f 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java @@ -78,7 +78,7 @@ public static PoolBuilder> from(Publisher allo PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE; boolean idleLruOrder = true; BiFunction pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER; - int warmupConcurrency = DEFAULT_WARMUP_CONCURRENCY; + boolean parallelizeWarmup = DEFAULT_PARALLELIZE_WARMUP; PoolBuilder(Mono allocator, Function, CONF> configModifier) { this.allocator = allocator; @@ -432,23 +432,24 @@ public > PoolBuilder extraConfiguration(Fu } /** - * Specifies the concurrency level used when the allocator is subscribed to during the warmup phase. - * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are - * subscribed to at the same time. - * A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently. - * A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being - * subscribed to from the current thread. - * By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the - * highest possible concurrency level. + * Specifies if the allocator should be subscribed to eagerly during warmup phase. + *

+ * Setting the {@code enableParallelism} flag to {@code true} means that during warmup, all resources that must be pre-allocated will be + * created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources. + *

+ * Setting the {@code enableParallelism} flag to {@code false} means that pre-allocation of resources is achieved by + * sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator, + * and so on until the last pre-allocated resource completes. + * + *

+ * By default, the warmup parallelism is disabled. + * + * @see #allocator * - * @param warmupConcurrency The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive, - * {@code Integer.MAX_VALUE} by default + * @param enableParallelism Specifies if the allocator should be subscribed to eagerly during warmup phase, {@code false} by default */ - public PoolBuilder warmupConcurrency(int warmupConcurrency) { - if (warmupConcurrency < 1) { - throw new IllegalArgumentException("warmupConcurrency must be positive"); - } - this.warmupConcurrency = warmupConcurrency; + public PoolBuilder parallelizeWarmup(boolean enableParallelism) { + this.parallelizeWarmup = enableParallelism; return this; } @@ -502,7 +503,7 @@ CONF buildConfig() { metricsRecorder, clock, idleLruOrder, - warmupConcurrency); + parallelizeWarmup); return this.configModifier.apply(baseConfig); } @@ -524,5 +525,5 @@ static BiPredicate idlePredicate(Duration maxIdleTime) static final Function> NOOP_HANDLER = it -> Mono.empty(); static final BiPredicate NEVER_PREDICATE = (ignored1, ignored2) -> false; static final BiFunction DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS); - static final int DEFAULT_WARMUP_CONCURRENCY = Integer.MAX_VALUE; + static final boolean DEFAULT_PARALLELIZE_WARMUP = false; } diff --git a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java index 0d1cf84e..96b91371 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java @@ -142,16 +142,23 @@ default BiFunction pendingAcquireTimer() { } /** - * Specifies the concurrency level used when the allocator is subscribed to during the warmup phase. - * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code concurrency} resources are - * subscribed to at the same time. - * A concurrency level of 1 means that warmed-up resources will be pre-allocated one after the other, not concurrently. - * A concurrency level of {@code Integer.MAX_VALUE} means that all pre-allocated resources will be created eagerly, with all resources being - * subscribed to from the current thread. - * By default, the concurrency level is set to {@code Integer.MAX_VALUE}, meaning that the allocator is subscribed to with the - * highest possible concurrency level. + * Specifies if the allocator should be subscribed to eagerly during warmup phase. + *

+ * Returning {@code true} means that during warmup, all resources that must be pre-allocated will be + * created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources. + *

+ * Returning {@code false} means that pre-allocation of resources is achieved by + * sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator, + * and so on until the last pre-allocated resource completes. + * + *

+ * By default, the warmup parallelism is disabled. + * + * @see #allocator + * + * @return {@code true} if the allocator should be subscribed to eagerly during warmup phase */ - default int warmupConcurrency() { - return PoolBuilder.DEFAULT_WARMUP_CONCURRENCY; + default boolean parallelizeWarmup() { + return PoolBuilder.DEFAULT_PARALLELIZE_WARMUP; } } diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 6e25dd68..6ed4349e 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -251,9 +251,9 @@ public Mono warmup() { }); } // merge will eagerly subscribe to all warmups from the current thread, but - // the concurrency can be controlled from configuration. - int warmupConcurrency = Math.min(allWarmups.length, poolConfig.warmupConcurrency()); - return Flux.merge(Flux.fromArray(allWarmups), warmupConcurrency) + // the parallelism can be controlled from configuration. + int mergeConcurrency = poolConfig.parallelizeWarmup() ? allWarmups.length : 1; + return Flux.merge(Flux.fromArray(allWarmups), mergeConcurrency) .reduce(0, (count, p) -> count + 1); }); } @@ -451,8 +451,8 @@ else if (sig.isOnError()) { // merge will eagerly subscribe to the allocator from the current thread, but the concurrency // can be controlled from configuration - int warmupConcurrency = Math.min(toWarmup + 1, poolConfig.warmupConcurrency()); - Flux.merge(monos, warmupConcurrency, Queues.XS_BUFFER_SIZE) + int mergeConcurrency = poolConfig.parallelizeWarmup() ? toWarmup + 1 : 1; + Flux.merge(monos, mergeConcurrency, Queues.XS_BUFFER_SIZE) .onErrorResume(e -> Mono.empty()) .subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain()); } diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index b7f86abe..59a79303 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -1401,6 +1401,40 @@ void recordsAllocationLatenciesInWarmup(PoolStyle configAdjuster) { .clock(recorder.getClock()); AbstractPool pool = configAdjuster.apply(builder); + assertThatIllegalStateException() + .isThrownBy(() -> pool.warmup().block()); + + assertThat(recorder.getAllocationTotalCount()).isEqualTo(2); + + long minSuccess = recorder.getAllocationSuccessHistogram().getMinValue(); + long minError = recorder.getAllocationErrorHistogram().getMinValue(); + + assertThat(minSuccess).as("allocation success latency").isGreaterThanOrEqualTo(100L); + assertThat(minError).as("allocation error latency").isGreaterThanOrEqualTo(200L); + } + + @ParameterizedTestWithName + @MethodSource("allPools") + @Tag("metrics") + void recordsAllocationLatenciesInEagerWarmup(PoolStyle configAdjuster) { + AtomicBoolean flip = new AtomicBoolean(); + //note the starter method here is irrelevant, only the config is created and passed to createPool + PoolBuilder builder = PoolBuilder + .from(Mono.defer(() -> { + if (flip.compareAndSet(false, true)) { + return Mono.just("foo").delayElement(Duration.ofMillis(100)); + } + else { + flip.compareAndSet(true, false); + return Mono.delay(Duration.ofMillis(200)).then(Mono.error(new IllegalStateException("boom"))); + } + })) + .sizeBetween(10, Integer.MAX_VALUE) + .parallelizeWarmup(true) + .metricsRecorder(recorder) + .clock(recorder.getClock()); + AbstractPool pool = configAdjuster.apply(builder); + // warmup will eagerly subscribe 10 times to the allocator. // The five first subscribtions will success (after around 100 millis), and some allocation should fail after around // 200 millis. diff --git a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java index 0d272ab8..4d94039d 100644 --- a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java +++ b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java @@ -70,10 +70,14 @@ public class PoolWarmupTest { protected static Stream warmupTestArgs() { return Stream.of( - Arguments.of(true, 10, Schedulers.immediate()), - Arguments.of(true, 1, Schedulers.single()), - Arguments.of(false, 10, Schedulers.immediate()), - Arguments.of(false, 1, Schedulers.single()) + Arguments.of(true, true, Schedulers.immediate(), true), + Arguments.of(true, true, Schedulers.single(), true), + Arguments.of(true, false, Schedulers.single(), true), + Arguments.of(true, false, Schedulers.immediate(), false), + Arguments.of(false, true, Schedulers.immediate(), true), + Arguments.of(false, true, Schedulers.single(), true), + Arguments.of(false, false, Schedulers.single(), true), + Arguments.of(false, false, Schedulers.immediate(), false) ); } @@ -140,7 +144,7 @@ final static class DBConnectionPool { final InstrumentedPool pool; final static AtomicInteger roundRobin = new AtomicInteger(); - DBConnectionPool(int poolSize, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) { + DBConnectionPool(int poolSize, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler) { this.poolSize = poolSize; this.dbThreads = new DBConnectionThread[poolSize]; IntStream.range(0, poolSize).forEach(i -> dbThreads[i] = new DBConnectionThread("dbthread-" + i)); @@ -160,7 +164,7 @@ final static class DBConnectionPool { .subscribeOn(allocatorSubscribeScheduler)) .sizeBetween(10, 10) .idleResourceReuseOrder(false) - .warmupConcurrency(warmupConcurrency) + .parallelizeWarmup(enableParallelWarmup) .buildPool(); } @@ -182,9 +186,9 @@ void stop() { @ParameterizedTest @MethodSource("warmupTestArgs") - void warmupTest(boolean doWarmup, int warmupConcurrency, Scheduler allocatorSubscribeScheduler) { + void warmupTest(boolean doWarmup, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler, boolean expectSuccess) { int poolSize = 10; - DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, warmupConcurrency, allocatorSubscribeScheduler); + DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, enableParallelWarmup, allocatorSubscribeScheduler); try { InstrumentedPool pool = dbConnectionPool.getPool(); @@ -212,7 +216,12 @@ void warmupTest(boolean doWarmup, int warmupConcurrency, Scheduler allocatorSubs long elapsed = (System.currentTimeMillis() - startTime); LOGGER.info("Elapsed time: " + elapsed + ", concurrency=" + dbConnectionPool.dbThreadsUsed()); - assertThat(dbConnectionPool.dbThreadsUsed()).isEqualTo(10); + if (expectSuccess) { + assertThat(dbConnectionPool.dbThreadsUsed()).isEqualTo(10); + } + else { + assertThat(dbConnectionPool.dbThreadsUsed()).isLessThan(10); + } } finally { dbConnectionPool.stop(); } From 6b686b964f103edbc44e78c2c5f6d99f935bb5cd Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Wed, 5 Jul 2023 16:00:26 +0200 Subject: [PATCH 8/9] Replace PoolBuilder.parallelizeWarmup(boolean) by PoolBuilder.sizeBetween(min, max, warmupParallelism). --- .../reactor/pool/AllocationStrategies.java | 19 +++++- .../java/reactor/pool/AllocationStrategy.java | 20 ++++++- .../java/reactor/pool/DefaultPoolConfig.java | 40 +------------ .../main/java/reactor/pool/PoolBuilder.java | 59 ++++++++++--------- .../main/java/reactor/pool/PoolConfig.java | 22 +------ .../java/reactor/pool/SimpleDequePool.java | 4 +- .../SamplingAllocationStrategy.java | 7 ++- .../java/reactor/pool/CommonPoolTest.java | 3 +- .../java/reactor/pool/PoolWarmupTest.java | 35 ++++++----- 9 files changed, 103 insertions(+), 106 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/AllocationStrategies.java b/reactor-pool/src/main/java/reactor/pool/AllocationStrategies.java index 9fcef827..52b340c3 100644 --- a/reactor-pool/src/main/java/reactor/pool/AllocationStrategies.java +++ b/reactor-pool/src/main/java/reactor/pool/AllocationStrategies.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,16 +74,28 @@ static final class SizeBasedAllocationStrategy implements AllocationStrategy { final int min; final int max; + final int warmupParallelism; volatile int permits; static final AtomicIntegerFieldUpdater PERMITS = AtomicIntegerFieldUpdater.newUpdater(SizeBasedAllocationStrategy.class, "permits"); SizeBasedAllocationStrategy(int min, int max) { + this(min, max, PoolBuilder.DEFAULT_WARMUP_PARALLELISM); + } + + SizeBasedAllocationStrategy(int min, int max, int warmupParallelism) { if (min < 0) throw new IllegalArgumentException("min must be positive or zero"); if (max < 1) throw new IllegalArgumentException("max must be strictly positive"); if (min > max) throw new IllegalArgumentException("min must be less than or equal to max"); + if (min > 0 && warmupParallelism < 1) { + throw new IllegalArgumentException("warmupParallelism must be greater than 0"); + } + if (min > 0 && warmupParallelism > min) { + throw new IllegalArgumentException("warmupParallelism must be less than or equal to min"); + } this.min = min; this.max = max; + this.warmupParallelism = warmupParallelism; PERMITS.lazySet(this, this.max); } @@ -146,5 +158,10 @@ public void returnPermits(int returned) { } } } + + @Override + public int warmupParallelism() { + return warmupParallelism; + } } } diff --git a/reactor-pool/src/main/java/reactor/pool/AllocationStrategy.java b/reactor-pool/src/main/java/reactor/pool/AllocationStrategy.java index 746a269c..0a1579d8 100644 --- a/reactor-pool/src/main/java/reactor/pool/AllocationStrategy.java +++ b/reactor-pool/src/main/java/reactor/pool/AllocationStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -75,4 +75,22 @@ public interface AllocationStrategy { * is not consistent with the strategy's limits and delivered permits. */ void returnPermits(int returned); + + /** + * Return the concurrency level used when the allocator is subscribed to during the warmup phase, if any. + *

+ * The number of resources created concurrently will not exceed the value returned by {@code warmupParallelism()}. + * If the concurrency level is set to 1, pre-allocation of resources will be performed sequentially by subscribing to the allocator + * one at a time. The process waits for a resource to be created before subscribing again to the allocator. + * This sequence continues until all pre-allocated resources have been successfully created. + *

+ * Defaults to 1 + * + * @return The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive, + * {@code 1} by default + * @since 1.0.1 + */ + default int warmupParallelism() { + return PoolBuilder.DEFAULT_WARMUP_PARALLELISM; + } } diff --git a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java index b8962297..eebbf04d 100644 --- a/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/DefaultPoolConfig.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; /** * A default {@link PoolConfig} that can be extended to bear more configuration options @@ -49,7 +50,6 @@ public class DefaultPoolConfig implements PoolConfig { protected final PoolMetricsRecorder metricsRecorder; protected final Clock clock; protected final boolean isIdleLRU; - protected final boolean parallelizeWarmup; public DefaultPoolConfig(Mono allocator, AllocationStrategy allocationStrategy, @@ -63,8 +63,7 @@ public DefaultPoolConfig(Mono allocator, Scheduler acquisitionScheduler, PoolMetricsRecorder metricsRecorder, Clock clock, - boolean isIdleLRU, - boolean parallelizeWarmup) { + boolean isIdleLRU) { this.pendingAcquireTimer = pendingAcquireTimer; this.allocator = allocator; this.allocationStrategy = allocationStrategy; @@ -78,32 +77,6 @@ public DefaultPoolConfig(Mono allocator, this.metricsRecorder = metricsRecorder; this.clock = clock; this.isIdleLRU = isIdleLRU; - this.parallelizeWarmup = parallelizeWarmup; - } - - /** - * @deprecated use the {@link #DefaultPoolConfig(Mono, AllocationStrategy, int, BiFunction, Function, Function, BiPredicate, Duration, Scheduler, Scheduler, PoolMetricsRecorder, Clock, boolean, boolean) other constructor} - * with explicit setting of parallelizeWarmup, to be removed in 1.0.2 at the earliest. - * @since 1.0.1 - */ - @Deprecated - public DefaultPoolConfig(Mono allocator, - AllocationStrategy allocationStrategy, - int maxPending, - BiFunction pendingAcquireTimer, - Function> releaseHandler, - Function> destroyHandler, - BiPredicate evictionPredicate, - Duration evictInBackgroundInterval, - Scheduler evictInBackgroundScheduler, - Scheduler acquisitionScheduler, - PoolMetricsRecorder metricsRecorder, - Clock clock, - boolean isIdleLRU) { - this(allocator, allocationStrategy, maxPending, pendingAcquireTimer, releaseHandler, - destroyHandler, evictionPredicate, evictInBackgroundInterval, evictInBackgroundScheduler, - acquisitionScheduler, metricsRecorder, clock, isIdleLRU, - PoolBuilder.DEFAULT_PARALLELIZE_WARMUP); } /** @@ -128,7 +101,6 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopyDpc.metricsRecorder; this.clock = toCopyDpc.clock; this.isIdleLRU = toCopyDpc.isIdleLRU; - this.parallelizeWarmup = toCopyDpc.parallelizeWarmup; } else { this.allocator = toCopy.allocator(); @@ -144,7 +116,6 @@ protected DefaultPoolConfig(PoolConfig toCopy) { this.metricsRecorder = toCopy.metricsRecorder(); this.clock = toCopy.clock(); this.isIdleLRU = toCopy.reuseIdleResourcesInLruOrder(); - this.parallelizeWarmup = toCopy.parallelizeWarmup(); } } @@ -168,11 +139,6 @@ public BiFunction pendingAcquireTimer() { return this.pendingAcquireTimer; } - @Override - public boolean parallelizeWarmup() { - return this.parallelizeWarmup; - } - @Override public Function> releaseHandler() { return this.releaseHandler; diff --git a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java index 64c9f27f..7d09d858 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolBuilder.java @@ -78,7 +78,6 @@ public static PoolBuilder> from(Publisher allo PoolMetricsRecorder metricsRecorder = NoOpPoolMetricsRecorder.INSTANCE; boolean idleLruOrder = true; BiFunction pendingAcquireTimer = DEFAULT_PENDING_ACQUIRE_TIMER; - boolean parallelizeWarmup = DEFAULT_PARALLELIZE_WARMUP; PoolBuilder(Mono allocator, Function, CONF> configModifier) { this.allocator = allocator; @@ -350,6 +349,10 @@ public PoolBuilder releaseHandler(Function * {@code min} live resources before serving the acquire with (one of) the newly created resource(s). * At the same time it MUST NOT allocate any resource if that would bring the number of live resources * over the {@code max}, rejecting further allocations until some resources have been {@link PooledRef#release() released}. + *

+ * Pre-allocation of warmed-up resources, if any, will be performed sequentially by subscribing to the allocator + * one at a time. The process waits for a resource to be created before subscribing again to the allocator. + * This sequence continues until all pre-allocated resources have been successfully created. * * @param min the minimum number of live resources to keep in the pool (can be best effort) * @param max the maximum number of live resources to keep in the pool. use {@link Integer#MAX_VALUE} when you only need a @@ -359,7 +362,32 @@ public PoolBuilder releaseHandler(Function * @see #allocationStrategy(AllocationStrategy) */ public PoolBuilder sizeBetween(int min, int max) { - return allocationStrategy(new AllocationStrategies.SizeBasedAllocationStrategy(min, max)); + return sizeBetween(min, max, DEFAULT_WARMUP_PARALLELISM); + } + + /** + * Replace the {@link AllocationStrategy} with one that lets the {@link Pool} allocate between {@code min} and {@code max} resources. + * When acquiring and there is no available resource, the pool should strive to warm up enough resources to reach + * {@code min} live resources before serving the acquire with (one of) the newly created resource(s). + * At the same time it MUST NOT allocate any resource if that would bring the number of live resources + * over the {@code max}, rejecting further allocations until some resources have been {@link PooledRef#release() released}. + * + * @param min the minimum number of live resources to keep in the pool (can be best effort) + * @param max the maximum number of live resources to keep in the pool. use {@link Integer#MAX_VALUE} when you only need a + * minimum and no upper bound + * @param warmupParallelism Specifies the concurrency level used when the allocator is subscribed to during the warmup phase, if any. + * During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code warmupParallelism} resources are + * subscribed to at the same time. + * A {@code warmupParallelism} of 1 means that pre-allocation of resources is achieved by sequentially subscribing to the allocator, + * waiting for a resource to be created before subscribing a next time to the allocator, and so on until the last allocation + * completes. + * @return this {@link Pool} builder + * @see #sizeUnbounded() + * @see #allocationStrategy(AllocationStrategy) + * @since 1.0.1 + */ + public PoolBuilder sizeBetween(int min, int max, int warmupParallelism) { + return allocationStrategy(new AllocationStrategies.SizeBasedAllocationStrategy(min, max, warmupParallelism)); } /** @@ -431,28 +459,6 @@ public > PoolBuilder extraConfiguration(Fu return new PoolBuilder<>(this, this.configModifier.andThen(configModifier)); } - /** - * Specifies if the allocator should be subscribed to eagerly during warmup phase. - *

- * Setting the {@code enableParallelism} flag to {@code true} means that during warmup, all resources that must be pre-allocated will be - * created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources. - *

- * Setting the {@code enableParallelism} flag to {@code false} means that pre-allocation of resources is achieved by - * sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator, - * and so on until the last pre-allocated resource completes. - * - *

- * By default, the warmup parallelism is disabled. - * - * @see #allocator - * - * @param enableParallelism Specifies if the allocator should be subscribed to eagerly during warmup phase, {@code false} by default - */ - public PoolBuilder parallelizeWarmup(boolean enableParallelism) { - this.parallelizeWarmup = enableParallelism; - return this; - } - /** * Construct a default reactor pool with the builder's configuration. * @@ -502,8 +508,7 @@ CONF buildConfig() { acquisitionScheduler, metricsRecorder, clock, - idleLruOrder, - parallelizeWarmup); + idleLruOrder); return this.configModifier.apply(baseConfig); } @@ -525,5 +530,5 @@ static BiPredicate idlePredicate(Duration maxIdleTime) static final Function> NOOP_HANDLER = it -> Mono.empty(); static final BiPredicate NEVER_PREDICATE = (ignored1, ignored2) -> false; static final BiFunction DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS); - static final boolean DEFAULT_PARALLELIZE_WARMUP = false; + static final int DEFAULT_WARMUP_PARALLELISM = 1; } diff --git a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java index 96b91371..99e38860 100644 --- a/reactor-pool/src/main/java/reactor/pool/PoolConfig.java +++ b/reactor-pool/src/main/java/reactor/pool/PoolConfig.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -141,24 +141,4 @@ default BiFunction pendingAcquireTimer() { return PoolBuilder.DEFAULT_PENDING_ACQUIRE_TIMER; } - /** - * Specifies if the allocator should be subscribed to eagerly during warmup phase. - *

- * Returning {@code true} means that during warmup, all resources that must be pre-allocated will be - * created eagerly. The allocator will be eagerly subscribed to from the current thread for each pre-allocated resources. - *

- * Returning {@code false} means that pre-allocation of resources is achieved by - * sequentially subscribing to the allocator, waiting for a resource to be created before subscribing a next time to the allocator, - * and so on until the last pre-allocated resource completes. - * - *

- * By default, the warmup parallelism is disabled. - * - * @see #allocator - * - * @return {@code true} if the allocator should be subscribed to eagerly during warmup phase - */ - default boolean parallelizeWarmup() { - return PoolBuilder.DEFAULT_PARALLELIZE_WARMUP; - } } diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 6ed4349e..dd4f84d8 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -252,7 +252,7 @@ public Mono warmup() { } // merge will eagerly subscribe to all warmups from the current thread, but // the parallelism can be controlled from configuration. - int mergeConcurrency = poolConfig.parallelizeWarmup() ? allWarmups.length : 1; + int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), allWarmups.length); return Flux.merge(Flux.fromArray(allWarmups), mergeConcurrency) .reduce(0, (count, p) -> count + 1); }); @@ -451,7 +451,7 @@ else if (sig.isOnError()) { // merge will eagerly subscribe to the allocator from the current thread, but the concurrency // can be controlled from configuration - int mergeConcurrency = poolConfig.parallelizeWarmup() ? toWarmup + 1 : 1; + int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); Flux.merge(monos, mergeConcurrency, Queues.XS_BUFFER_SIZE) .onErrorResume(e -> Mono.empty()) .subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain()); diff --git a/reactor-pool/src/main/java/reactor/pool/introspection/SamplingAllocationStrategy.java b/reactor-pool/src/main/java/reactor/pool/introspection/SamplingAllocationStrategy.java index 6c7bb538..aa58a3b5 100644 --- a/reactor-pool/src/main/java/reactor/pool/introspection/SamplingAllocationStrategy.java +++ b/reactor-pool/src/main/java/reactor/pool/introspection/SamplingAllocationStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -250,4 +250,9 @@ public int permitMinimum() { public int permitMaximum() { return delegate.permitMaximum(); } + + @Override + public int warmupParallelism() { + return delegate.warmupParallelism(); + } } \ No newline at end of file diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index 59a79303..43824fdc 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -1429,8 +1429,7 @@ void recordsAllocationLatenciesInEagerWarmup(PoolStyle configAdjuster) { return Mono.delay(Duration.ofMillis(200)).then(Mono.error(new IllegalStateException("boom"))); } })) - .sizeBetween(10, Integer.MAX_VALUE) - .parallelizeWarmup(true) + .sizeBetween(10, Integer.MAX_VALUE, 10) .metricsRecorder(recorder) .clock(recorder.getClock()); AbstractPool pool = configAdjuster.apply(builder); diff --git a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java index 4d94039d..0a500d86 100644 --- a/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java +++ b/reactor-pool/src/test/java/reactor/pool/PoolWarmupTest.java @@ -60,7 +60,7 @@ *

  • like in reactor-netty and r2dbc, when a DBConnection is created, it will either use * a dedicate thread that will be used to send SQL requests on the DBConnection, unless the current thread is already * a DBConnection thread. In this case, the current DBConnection thread will be used: this is similar to - * "colocated" TcpRespource EventLoops in reactor-netty.
  • + * "colocated" TcpResource EventLoops in reactor-netty. * * * @author Pierre De Rop @@ -70,14 +70,22 @@ public class PoolWarmupTest { protected static Stream warmupTestArgs() { return Stream.of( - Arguments.of(true, true, Schedulers.immediate(), true), - Arguments.of(true, true, Schedulers.single(), true), - Arguments.of(true, false, Schedulers.single(), true), - Arguments.of(true, false, Schedulers.immediate(), false), - Arguments.of(false, true, Schedulers.immediate(), true), - Arguments.of(false, true, Schedulers.single(), true), - Arguments.of(false, false, Schedulers.single(), true), - Arguments.of(false, false, Schedulers.immediate(), false) + // explicit warmup with parallelism=10, subscribes to allocator from current thread, expect success + Arguments.of(true, 10, Schedulers.immediate(), true), + // explicit warmup with parallelism=10, subscribes to allocator from Schedulers.single, expect success + Arguments.of(true, 10, Schedulers.single(), true), + // explicit warmup with parallelism=1, subscribes to allocator from Schedulers.single, expect success + Arguments.of(true, 1, Schedulers.single(), true), + // explicit warmup with parallelism=1, subscribes to allocator from current thread, expect failure + Arguments.of(true, 1, Schedulers.immediate(), false), + // implicit warmup with parallelism=10, subscribes to allocator from current thread, expect success + Arguments.of(false, 10, Schedulers.immediate(), true), + // implicit warmup with parallelism=10, subscribes to allocator from Schedulers.single, expect success + Arguments.of(false, 10, Schedulers.single(), true), + // implicit warmup with parallelism=1, subscribes to allocator from Schedulers.single, expect success + Arguments.of(false, 1, Schedulers.single(), true), + // implicit warmup with parallelism=1, subscribes to allocator from current thread, expect failure + Arguments.of(false, 1, Schedulers.immediate(), false) ); } @@ -144,7 +152,7 @@ final static class DBConnectionPool { final InstrumentedPool pool; final static AtomicInteger roundRobin = new AtomicInteger(); - DBConnectionPool(int poolSize, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler) { + DBConnectionPool(int poolSize, int warmupParallelism, Scheduler allocatorSubscribeScheduler) { this.poolSize = poolSize; this.dbThreads = new DBConnectionThread[poolSize]; IntStream.range(0, poolSize).forEach(i -> dbThreads[i] = new DBConnectionThread("dbthread-" + i)); @@ -162,9 +170,8 @@ final static class DBConnectionPool { .publishOn(Schedulers.fromExecutor(dbThread)); }) .subscribeOn(allocatorSubscribeScheduler)) - .sizeBetween(10, 10) + .sizeBetween(10, 10, warmupParallelism) .idleResourceReuseOrder(false) - .parallelizeWarmup(enableParallelWarmup) .buildPool(); } @@ -186,9 +193,9 @@ void stop() { @ParameterizedTest @MethodSource("warmupTestArgs") - void warmupTest(boolean doWarmup, boolean enableParallelWarmup, Scheduler allocatorSubscribeScheduler, boolean expectSuccess) { + void warmupTest(boolean doWarmup, int warmupParallelism, Scheduler allocatorSubscribeScheduler, boolean expectSuccess) { int poolSize = 10; - DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, enableParallelWarmup, allocatorSubscribeScheduler); + DBConnectionPool dbConnectionPool = new DBConnectionPool(poolSize, warmupParallelism, allocatorSubscribeScheduler); try { InstrumentedPool pool = dbConnectionPool.getPool(); From c6ddf8b593f8504711bccaa51484e8761c48c317 Mon Sep 17 00:00:00 2001 From: Pierre De Rop Date: Fri, 7 Jul 2023 16:33:38 +0200 Subject: [PATCH 9/9] Applied feedback: reverted changes done in warmupMono, and applied suggestion in drainloop. --- .../java/reactor/pool/SimpleDequePool.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index dd4f84d8..7a80131f 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.BiPredicate; +import java.util.function.Function; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -43,7 +44,6 @@ import reactor.core.scheduler.Schedulers; import reactor.util.Loggers; import reactor.util.annotation.Nullable; -import reactor.util.concurrent.Queues; /** * The {@link SimpleDequePool} is based on {@link Deque} for idle resources and pending {@link Pool#acquire()} Monos, @@ -446,15 +446,15 @@ else if (sig.isOnError()) { logger.debug("should warm up {} extra resources", toWarmup); final long startWarmupIteration = clock.millis(); - Flux> monos = Flux.range(0, toWarmup + 1) - .map(n -> (n == 0) ? primary : warmupMono(n, toWarmup, startWarmupIteration, allocator)); - - // merge will eagerly subscribe to the allocator from the current thread, but the concurrency + // flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency // can be controlled from configuration - int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); - Flux.merge(monos, mergeConcurrency, Queues.XS_BUFFER_SIZE) + final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); + Flux.range(1, toWarmup) + .map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain())) + .startWith(primary.doOnSuccess(__ -> drain()).then()) + .flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified .onErrorResume(e -> Mono.empty()) - .subscribe(poolable -> drain(), alreadyPropagatedOrLogged -> drain(), () -> drain()); + .subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); } } } @@ -475,7 +475,7 @@ private Mono allocatorWithScheduler() { return poolConfig.allocator(); } - Mono warmupMono(int index, int max, long startWarmupIteration, Mono allocator) { + Mono warmupMono(int index, int max, long startWarmupIteration, Mono allocator) { return allocator.flatMap(poolable -> { logger.debug("warmed up extra resource {}/{}", index, max); metricsRecorder.recordAllocationSuccessAndLatency( @@ -485,10 +485,9 @@ Mono warmupMono(int index, int max, long startWarmupIteration, Mono tempRef = createSlot(poolable); tempRef.markDestroy(); - return destroyPoolable(tempRef) - .then(Mono.empty()); + return destroyPoolable(tempRef); } - return Mono.just(poolable); + return Mono.empty(); }).onErrorResume(warmupError -> { logger.debug("failed to warm up extra resource {}/{}: {}", index, max, warmupError.toString());