Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing a kafka producer/consumer should not disable metrics from other consumers/producers #11975

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -70,6 +71,13 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest {
private static KafkaProducer<byte[], byte[]> producer;
private static KafkaConsumer<byte[], byte[]> consumer;

private static final List<OpenTelemetryMetricsReporter> metricsReporters =
new CopyOnWriteArrayList<>();

static {
OpenTelemetryMetricsReporter.setListener(metricsReporters::add);
}

@BeforeEach
void beforeAll() {
// only start the kafka container the first time this runs
Expand All @@ -90,14 +98,16 @@ void beforeAll() {

@AfterAll
static void afterAll() {
kafka.stop();
producer.close();
consumer.close();
kafka.stop();
}

@AfterEach
void tearDown() {
OpenTelemetryMetricsReporter.resetForTest();
for (OpenTelemetryMetricsReporter metricsReporter : metricsReporters) {
metricsReporter.resetForTest();
}
}

protected abstract InstrumentationExtension testing();
Expand Down Expand Up @@ -186,6 +196,14 @@ private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> met

@Test
void observeMetrics() {
// Firstly create new producer and consumer and close them. This is done tp verify that metrics
// are still produced after closing one producer/consumer. See
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11880
KafkaProducer<byte[], byte[]> producer2 = new KafkaProducer<>(producerConfig());
KafkaConsumer<byte[], byte[]> consumer2 = new KafkaConsumer<>(consumerConfig());
producer2.close();
consumer2.close();

produceRecords();
consumeRecords();

Expand Down Expand Up @@ -405,7 +423,9 @@ private static void printMappingTable() {
Map<String, List<KafkaMetricId>> kafkaMetricsByGroup =
TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup));
List<RegisteredObservable> registeredObservables =
OpenTelemetryMetricsReporter.getRegisteredObservables();
metricsReporters.stream()
.flatMap(metricsReporter -> metricsReporter.getRegisteredObservables().stream())
.collect(toList());
// Iterate through groups in alpha order
for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) {
List<KafkaMetricId> kafkaMetricIds =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,35 @@ public final class OpenTelemetryMetricsReporter implements MetricsReporter {

private static final Logger logger =
Logger.getLogger(OpenTelemetryMetricsReporter.class.getName());
private volatile Meter meter;
private static volatile Listener listener;

private static final Object lock = new Object();
private volatile Meter meter;
private final Object lock = new Object();

@GuardedBy("lock")
private static final List<RegisteredObservable> registeredObservables = new ArrayList<>();
private final List<RegisteredObservable> registeredObservables = new ArrayList<>();

/**
* Reset for test by resetting the {@link #meter} to {@code null} and closing all registered
* instruments.
*/
static void resetForTest() {
void resetForTest() {
closeAllInstruments();
}

// Visible for test
static List<RegisteredObservable> getRegisteredObservables() {
List<RegisteredObservable> getRegisteredObservables() {
synchronized (lock) {
return new ArrayList<>(registeredObservables);
}
}

public OpenTelemetryMetricsReporter() {
if (listener != null) {
listener.metricsReporterCreated(this);
}
}

@Override
public void init(List<KafkaMetric> metrics) {
metrics.forEach(this::metricChange);
Expand Down Expand Up @@ -131,7 +138,7 @@ public void close() {
closeAllInstruments();
}

private static void closeAllInstruments() {
private void closeAllInstruments() {
synchronized (lock) {
for (Iterator<RegisteredObservable> it = registeredObservables.iterator(); it.hasNext(); ) {
closeInstrument(it.next().getObservable());
Expand Down Expand Up @@ -177,4 +184,14 @@ private static <T> T getProperty(Map<String, ?> configs, String key, Class<T> re
}
return (T) value;
}

// Visible for test
static void setListener(Listener listener) {
OpenTelemetryMetricsReporter.listener = listener;
}

// used for testing
interface Listener {
void metricsReporterCreated(OpenTelemetryMetricsReporter metricsReporter);
}
}
Loading