Skip to content

Commit

Permalink
Call init() on delegate from TimedScheduler (#3876)
Browse files Browse the repository at this point in the history
The `TimedScheduler` class doesn't currently provide an override for the
`init()` method, which means it calls the default implementation on the
interface that delegates to `start()`. Most existing schedulers have an
implementation of `start()` and it's not a problem for them. However,
the newer `BoundedElasticThreadPerTaskScheduler` throws an error for
`start()`, so wrapping it in a `TimedScheduler` causes it to crash
immediately when `init()` gets called.

With this change, the wrapped scheduler's `init()` method is called
instead and allows users to get metrics for the virtual thread
schedulers.

---------

Co-authored-by: Dariusz Jędrzejczyk <[email protected]>
  • Loading branch information
luukveenis and chemicL authored Aug 22, 2024
1 parent 63d25e9 commit b732811
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public void start() {
delegate.start();
}

@Override
public void init() {
delegate.init();
}

static final class TimedWorker implements Worker {

final TimedScheduler parent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package reactor.core.observability.micrometer;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,7 +28,6 @@
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import io.micrometer.core.instrument.search.RequiredSearch;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
Expand Down Expand Up @@ -72,6 +69,60 @@ void closeRegistry() {
registry.close();
}

@Test
void supportsBothDeprecatedAndNonRestartableSchedulers() {
Scheduler deprecatedScheduler = new Scheduler() {

@Override
public Disposable schedule(Runnable task) {
return Disposables.disposed();
}

@Override
public Worker createWorker() {
throw new UnsupportedOperationException();
}
};

Scheduler nonRestartableScheduler = new Scheduler() {
@Override
public Disposable schedule(Runnable task) {
return Disposables.disposed();
}

@Override
public Worker createWorker() {
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation")
@Override
public void start() {
throw new UnsupportedOperationException();
}

@Override
public void init() {
}
};

TimedScheduler timedDeprecatedScheduler =
new TimedScheduler(deprecatedScheduler, registry, "test", Tags.empty());

TimedScheduler timedNonRestartableScheduler =
new TimedScheduler(nonRestartableScheduler, registry, "test", Tags.empty());

assertThatNoException().isThrownBy(() -> {
timedDeprecatedScheduler.init();
timedDeprecatedScheduler.start();
});

assertThatNoException().isThrownBy(timedNonRestartableScheduler::init);

assertThatExceptionOfType(UnsupportedOperationException.class)
.isThrownBy(timedNonRestartableScheduler::start);
}

@Test
void aDotIsAddedToPrefix() {
TimedScheduler test = new TimedScheduler(Schedulers.immediate(), registry, "noDot", Tags.empty());
Expand Down

0 comments on commit b732811

Please sign in to comment.