diff --git a/datahub-upgrade/src/main/resources/application.properties b/datahub-upgrade/src/main/resources/application.properties
index b884c92f74bd48..847c264dfac38c 100644
--- a/datahub-upgrade/src/main/resources/application.properties
+++ b/datahub-upgrade/src/main/resources/application.properties
@@ -3,3 +3,4 @@ management.health.neo4j.enabled=false
ingestion.enabled=false
spring.main.allow-bean-definition-overriding=true
entityClient.impl=restli
+metadataChangeProposal.throttle.updateIntervalMs=0
\ No newline at end of file
diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java
index dc4c3073ee351c..8b6899b4c78866 100644
--- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java
+++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java
@@ -1,12 +1,18 @@
package com.linkedin.datahub.upgrade;
-import static org.testng.AssertJUnit.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices;
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
+import com.linkedin.metadata.dao.throttle.NoOpSensor;
+import com.linkedin.metadata.dao.throttle.ThrottleSensor;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import javax.inject.Named;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
@@ -28,6 +34,10 @@ public class UpgradeCliApplicationTest extends AbstractTestNGSpringContextTests
@Autowired private ESIndexBuilder esIndexBuilder;
+ @Qualifier("kafkaThrottle")
+ @Autowired
+ private ThrottleSensor kafkaThrottle;
+
@Test
public void testRestoreIndicesInit() {
/*
@@ -46,4 +56,10 @@ public void testBuildIndicesInit() {
assertFalse(
esIndexBuilder.getElasticSearchConfiguration().getBuildIndices().isAllowDocCountMismatch());
}
+
+ @Test
+ public void testNoThrottle() {
+ assertEquals(
+ new NoOpSensor(), kafkaThrottle, "No kafka throttle controls expected in datahub-upgrade");
+ }
}
diff --git a/docs/authorization/policies.md b/docs/authorization/policies.md
index 45d0b59e408337..5c99241f75190f 100644
--- a/docs/authorization/policies.md
+++ b/docs/authorization/policies.md
@@ -146,15 +146,15 @@ These privileges are for DataHub operators to access & manage the administrative
#### Access & Credentials
-| Platform Privileges | Description |
-|--------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Generate Personal Access Tokens | Allow actor to generate personal access tokens for use with DataHub APIs. |
-| Manage Policies | Allow actor to create and remove access control policies. Be careful - Actors with this privilege are effectively super users. |
-| Manage Secrets | Allow actor to create & remove Secrets stored inside DataHub. |
-| Manage Users & Groups | Allow actor to create, remove, and update users and groups on DataHub. |
-| Manage All Access Tokens | Allow actor to create, list and revoke access tokens on behalf of users in DataHub. Be careful - Actors with this privilege are effectively super users that can impersonate other users. |
-| Manage User Credentials | Allow actor to manage credentials for native DataHub users, including inviting new users and resetting passwords | |
-| Manage Connections | Allow actor to manage connections to external DataHub platforms. |
+| Platform Privileges | Description |
+|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Generate Personal Access Tokens | Allow actor to generate personal access tokens for use with DataHub APIs. |
+| Manage Policies | Allow actor to create and remove access control policies. Be careful - Actors with this privilege are effectively super users. |
+| Manage Secrets | Allow actor to create & remove Secrets stored inside DataHub. |
+| Manage Users & Groups | Allow actor to create, remove, and update users and groups on DataHub. |
+| Manage All Access Tokens | Allow actor to create, list and revoke access tokens on behalf of users in DataHub. Be careful - Actors with this privilege are effectively super users that can impersonate other users. |
+| Manage User Credentials | Allow actor to manage credentials for native DataHub users, including inviting new users and resetting passwords | |
+| Manage Connections | Allow actor to manage connections to external DataHub platforms. |
#### Product Features
@@ -191,15 +191,16 @@ These privileges are for DataHub operators to access & manage the administrative
#### System Management
-| Platform Privileges | Description |
-|-----------------------------------------------|--------------------------------------------------------------------------|
-| Restore Indices API[^1] | Allow actor to use the Restore Indices API. | |
-| Get Timeseries index sizes API[^1] | Allow actor to use the get Timeseries indices size API. |
-| Truncate timeseries aspect index size API[^1] | Allow actor to use the API to truncate a timeseries index. |
-| Get ES task status API[^1] | Allow actor to use the get task status API for an ElasticSearch task. |
-| Enable/Disable Writeability API[^1] | Allow actor to enable or disable GMS writeability for data migrations. |
-| Apply Retention API[^1] | Allow actor to apply retention using the API. |
-| Analytics API access[^1] | Allow actor to use API read access to raw analytics data. |
+| Platform Privileges | Description |
+|-----------------------------------------------|------------------------------------------------------------------------|
+| Restore Indices API[^1] | Allow actor to use the Restore Indices API. | |
+| Get Timeseries index sizes API[^1] | Allow actor to use the get Timeseries indices size API. |
+| Truncate timeseries aspect index size API[^1] | Allow actor to use the API to truncate a timeseries index. |
+| Get ES task status API[^1] | Allow actor to use the get task status API for an ElasticSearch task. |
+| Enable/Disable Writeability API[^1] | Allow actor to enable or disable GMS writeability for data migrations. |
+| Apply Retention API[^1] | Allow actor to apply retention using the API. |
+| Analytics API access[^1] | Allow actor to use API read access to raw analytics data. |
+| Manage System Operations | Allow actor to manage system operation controls. |
[^1]: Only active if REST_API_AUTHORIZATION_ENABLED is true
[^2]: DataHub Cloud only
diff --git a/docs/deploy/environment-vars.md b/docs/deploy/environment-vars.md
index 21ed738e878f88..6429996c088b4a 100644
--- a/docs/deploy/environment-vars.md
+++ b/docs/deploy/environment-vars.md
@@ -14,21 +14,21 @@ DataHub works.
| `UI_INGESTION_ENABLED` | `true` | boolean | [`GMS`, `MCE Consumer`] | Enable UI based ingestion. |
| `DATAHUB_ANALYTICS_ENABLED` | `true` | boolean | [`Frontend`, `GMS`] | Collect DataHub usage to populate the analytics dashboard. |
| `BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE` | `true` | boolean | [`GMS`, `MCE Consumer`, `MAE Consumer`] | Do not wait for the `system-update` to complete before starting. This should typically only be disabled during development. |
-| `ER_MODEL_RELATIONSHIP_FEATURE_ENABLED` | `false` | boolean | [`Frontend`, `GMS`] | Enable ER Model Relation Feature that shows Relationships Tab within a Dataset UI. |
+| `ER_MODEL_RELATIONSHIP_FEATURE_ENABLED` | `false` | boolean | [`Frontend`, `GMS`] | Enable ER Model Relation Feature that shows Relationships Tab within a Dataset UI. |
## Ingestion
-| Variable | Default | Unit/Type | Components | Description |
-|------------------------------------|---------|-----------|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `ASYNC_INGEST_DEFAULT` | `false` | boolean | [`GMS`] | Asynchronously process ingestProposals by writing the ingestion MCP to Kafka. Typically enabled with standalone consumers. |
-| `MCP_CONSUMER_ENABLED` | `true` | boolean | [`GMS`, `MCE Consumer`] | When running in standalone mode, disabled on `GMS` and enabled on separate `MCE Consumer`. |
-| `MCL_CONSUMER_ENABLED` | `true` | boolean | [`GMS`, `MAE Consumer`] | When running in standalone mode, disabled on `GMS` and enabled on separate `MAE Consumer`. |
-| `PE_CONSUMER_ENABLED` | `true` | boolean | [`GMS`, `MAE Consumer`] | When running in standalone mode, disabled on `GMS` and enabled on separate `MAE Consumer`. |
-| `ES_BULK_REQUESTS_LIMIT` | 1000 | docs | [`GMS`, `MAE Consumer`] | Number of bulk documents to index. `MAE Consumer` if standalone. |
-| `ES_BULK_FLUSH_PERIOD` | 1 | seconds | [`GMS`, `MAE Consumer`] | How frequently indexed documents are made available for query. |
-| `ALWAYS_EMIT_CHANGE_LOG` | `false` | boolean | [`GMS`] | Enables always emitting a MCL even when no changes are detected. Used for Time Based Lineage when no changes occur. | |
-| `GRAPH_SERVICE_DIFF_MODE_ENABLED` | `true` | boolean | [`GMS`] | Enables diff mode for graph writes, uses a different code path that produces a diff from previous to next to write relationships instead of wholesale deleting edges and reading. |
+| Variable | Default | Unit/Type | Components | Description |
+|-----------------------------------|---------|-----------|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `ASYNC_INGEST_DEFAULT` | `false` | boolean | [`GMS`] | Asynchronously process ingestProposals by writing the ingestion MCP to Kafka. Typically enabled with standalone consumers. |
+| `MCP_CONSUMER_ENABLED` | `true` | boolean | [`GMS`, `MCE Consumer`] | When running in standalone mode, disabled on `GMS` and enabled on separate `MCE Consumer`. |
+| `MCL_CONSUMER_ENABLED` | `true` | boolean | [`GMS`, `MAE Consumer`] | When running in standalone mode, disabled on `GMS` and enabled on separate `MAE Consumer`. |
+| `PE_CONSUMER_ENABLED` | `true` | boolean | [`GMS`, `MAE Consumer`] | When running in standalone mode, disabled on `GMS` and enabled on separate `MAE Consumer`. |
+| `ES_BULK_REQUESTS_LIMIT` | 1000 | docs | [`GMS`, `MAE Consumer`] | Number of bulk documents to index. `MAE Consumer` if standalone. |
+| `ES_BULK_FLUSH_PERIOD` | 1 | seconds | [`GMS`, `MAE Consumer`] | How frequently indexed documents are made available for query. |
+| `ALWAYS_EMIT_CHANGE_LOG` | `false` | boolean | [`GMS`] | Enables always emitting a MCL even when no changes are detected. Used for Time Based Lineage when no changes occur. | |
+| `GRAPH_SERVICE_DIFF_MODE_ENABLED` | `true` | boolean | [`GMS`] | Enables diff mode for graph writes, uses a different code path that produces a diff from previous to next to write relationships instead of wholesale deleting edges and reading. |
## Caching
diff --git a/lombok.config b/lombok.config
index df71bb6a0fb878..7324b9265c5203 100644
--- a/lombok.config
+++ b/lombok.config
@@ -1,2 +1,3 @@
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
+lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/producer/KafkaProducerThrottle.java b/metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/throttle/KafkaThrottleSensor.java
similarity index 61%
rename from metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/producer/KafkaProducerThrottle.java
rename to metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/throttle/KafkaThrottleSensor.java
index 8fbb34b1eacd6f..2adf2543aa2f77 100644
--- a/metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/producer/KafkaProducerThrottle.java
+++ b/metadata-dao-impl/kafka-producer/src/main/java/com/datahub/metadata/dao/throttle/KafkaThrottleSensor.java
@@ -1,20 +1,31 @@
-package com.datahub.metadata.dao.producer;
+package com.datahub.metadata.dao.throttle;
+
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_TIMESERIES_LAG;
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_VERSIONED_LAG;
import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
+import com.linkedin.metadata.dao.throttle.ThrottleControl;
+import com.linkedin.metadata.dao.throttle.ThrottleEvent;
+import com.linkedin.metadata.dao.throttle.ThrottleSensor;
+import com.linkedin.metadata.dao.throttle.ThrottleType;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.util.Pair;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
@@ -27,23 +38,43 @@
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.ExponentialBackOff;
+/**
+ * This class is designed to monitor MCL consumption by a specific consumer group and provide
+ * throttling hooks.
+ *
+ *
Initially this was designed for throttling the async mcp processor `mce-consumer`, however it
+ * also handles throttling synchronous requests via rest.li, graphql, and openapi for non-browser
+ * based requests.
+ */
@Slf4j
@Builder(toBuilder = true)
-public class KafkaProducerThrottle {
+public class KafkaThrottleSensor implements ThrottleSensor {
+ private static final Set SUPPORTED_THROTTLE_TYPES =
+ Set.of(MCL_VERSIONED_LAG, MCL_TIMESERIES_LAG);
@Nonnull private final EntityRegistry entityRegistry;
@Nonnull private final Admin kafkaAdmin;
@Nonnull private final MetadataChangeProposalConfig.ThrottlesConfig config;
@Nonnull private final String mclConsumerGroupId;
@Nonnull private final String versionedTopicName;
@Nonnull private final String timeseriesTopicName;
- @Nonnull private final Consumer pauseConsumer;
+
+ /** A list of throttle event listeners to execute when throttling occurs and ceases */
+ @Builder.Default @Nonnull
+ private final List> throttleCallbacks =
+ new ArrayList<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
- private final Map medianLag = new ConcurrentHashMap<>();
- private final Map backoffMap = new ConcurrentHashMap<>();
+ private final Map medianLag = new ConcurrentHashMap<>();
+ private final Map backoffMap = new ConcurrentHashMap<>();
+
+ @Override
+ public KafkaThrottleSensor addCallback(Function callback) {
+ throttleCallbacks.add(callback);
+ return this;
+ }
/** Update lag information at a given rate */
- public KafkaProducerThrottle start() {
+ public KafkaThrottleSensor start() {
if ((config.getVersioned().isEnabled() || config.getTimeseries().isEnabled())
&& config.getUpdateIntervalMs() > 0) {
scheduler.scheduleAtFixedRate(
@@ -79,13 +110,13 @@ public void stop() {
* @return median lag per mcl topic
*/
@VisibleForTesting
- public Map getLag() {
+ public Map getLag() {
return medianLag.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@VisibleForTesting
- public boolean isThrottled(MclType mclType) {
+ public boolean isThrottled(ThrottleType mclType) {
if (getThrottleConfig(mclType).isEnabled() && medianLag.containsKey(mclType)) {
return medianLag.get(mclType) > getThrottleConfig(mclType).getThreshold();
}
@@ -93,7 +124,7 @@ public boolean isThrottled(MclType mclType) {
}
@VisibleForTesting
- public long computeNextBackOff(MclType mclType) {
+ public long computeNextBackOff(ThrottleType mclType) {
if (isThrottled(mclType)) {
BackOffExecution backOffExecution =
backoffMap.computeIfAbsent(
@@ -115,54 +146,61 @@ public long computeNextBackOff(MclType mclType) {
@VisibleForTesting
public void throttle() throws InterruptedException {
- for (MclType mclType : MclType.values()) {
- if (isThrottled(mclType)) {
- long backoffWaitMs = computeNextBackOff(mclType);
-
- if (backoffWaitMs > 0) {
- log.warn(
- "Throttled producer Topic: {} Duration: {} ms MedianLag: {}",
- getTopicName(mclType),
- backoffWaitMs,
- medianLag.get(mclType));
- MetricUtils.gauge(
- this.getClass(),
- String.format("%s_throttled", getTopicName(mclType)),
- () -> (Gauge>) () -> 1);
- MetricUtils.counter(
- this.getClass(), String.format("%s_throttledCount", getTopicName(mclType)))
- .inc();
-
- log.info("Pausing MCE consumer for {} ms.", backoffWaitMs);
- pauseConsumer.accept(true);
- Thread.sleep(backoffWaitMs);
- log.info("Resuming MCE consumer.");
- pauseConsumer.accept(false);
-
- // if throttled for one topic, skip remaining
- return;
- } else {
- // no throttle or exceeded configuration limits
- log.info("MCE consumer throttle exponential backoff reset.");
- backoffMap.remove(mclType);
- MetricUtils.gauge(
- this.getClass(),
- String.format("%s_throttled", getTopicName(mclType)),
- () -> (Gauge>) () -> 0);
- }
- } else {
+
+ Map throttled = new LinkedHashMap<>();
+
+ for (ThrottleType mclType : SUPPORTED_THROTTLE_TYPES) {
+ long backoffWaitMs = computeNextBackOff(mclType);
+
+ if (backoffWaitMs <= 0) {
// not throttled, remove backoff tracking
- log.info("MCE consumer throttle exponential backoff reset.");
+ log.info("Throttle exponential backoff reset.");
backoffMap.remove(mclType);
MetricUtils.gauge(
this.getClass(),
String.format("%s_throttled", getTopicName(mclType)),
() -> (Gauge>) () -> 0);
+ } else {
+ throttled.put(mclType, backoffWaitMs);
+ }
+ }
+
+ // handle throttled
+ if (!throttled.isEmpty()) {
+ long maxBackoffWaitMs = throttled.values().stream().max(Comparator.naturalOrder()).get();
+ log.warn(
+ "Throttled Topic: {} Duration: {} ms MedianLag: {}",
+ throttled.keySet().stream().map(this::getTopicName).collect(Collectors.toList()),
+ maxBackoffWaitMs,
+ throttled.keySet().stream().map(medianLag::get).collect(Collectors.toList()));
+
+ throttled.keySet().stream()
+ .forEach(
+ mclType -> {
+ MetricUtils.gauge(
+ this.getClass(),
+ String.format("%s_throttled", getTopicName(mclType)),
+ () -> (Gauge>) () -> 1);
+ MetricUtils.counter(
+ this.getClass(), String.format("%s_throttledCount", getTopicName(mclType)))
+ .inc();
+ });
+
+ log.info("Throttling {} callbacks for {} ms.", throttleCallbacks.size(), maxBackoffWaitMs);
+ final ThrottleEvent throttleEvent = ThrottleEvent.throttle(throttled);
+ List throttleControls =
+ throttleCallbacks.stream().map(callback -> callback.apply(throttleEvent)).toList();
+
+ if (throttleControls.stream().anyMatch(ThrottleControl::hasCallback)) {
+ Thread.sleep(maxBackoffWaitMs);
+ log.info("Resuming {} callbacks after wait.", throttleControls.size());
+ throttleControls.forEach(
+ control -> control.execute(ThrottleEvent.clearThrottle(throttleEvent)));
}
}
}
- private Map getMedianLag() {
+ private Map getMedianLag() {
try {
Map mclConsumerOffsets =
kafkaAdmin
@@ -183,11 +221,11 @@ private Map getMedianLag() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return Stream.of(
- Pair.of(MclType.VERSIONED, versionedTopicName),
- Pair.of(MclType.TIMESERIES, timeseriesTopicName))
+ Pair.of(MCL_VERSIONED_LAG, versionedTopicName),
+ Pair.of(MCL_TIMESERIES_LAG, timeseriesTopicName))
.map(
topic -> {
- MclType mclType = topic.getFirst();
+ ThrottleType mclType = topic.getFirst();
String topicName = topic.getSecond();
Map topicOffsets =
@@ -212,22 +250,22 @@ private Map getMedianLag() {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
} catch (ExecutionException | InterruptedException e) {
log.error("Error fetching consumer group offsets.", e);
- return Map.of(MclType.VERSIONED, 0L, MclType.TIMESERIES, 0L);
+ return Map.of(MCL_VERSIONED_LAG, 0L, MCL_TIMESERIES_LAG, 0L);
}
}
- private MetadataChangeProposalConfig.ThrottleConfig getThrottleConfig(MclType mclType) {
+ private MetadataChangeProposalConfig.ThrottleConfig getThrottleConfig(ThrottleType mclType) {
MetadataChangeProposalConfig.ThrottleConfig throttleConfig;
switch (mclType) {
- case VERSIONED -> throttleConfig = config.getVersioned();
- case TIMESERIES -> throttleConfig = config.getTimeseries();
+ case MCL_VERSIONED_LAG -> throttleConfig = config.getVersioned();
+ case MCL_TIMESERIES_LAG -> throttleConfig = config.getTimeseries();
default -> throw new IllegalStateException();
}
return throttleConfig;
}
- private String getTopicName(MclType mclType) {
- return MclType.TIMESERIES.equals(mclType) ? timeseriesTopicName : versionedTopicName;
+ private String getTopicName(ThrottleType mclType) {
+ return MCL_TIMESERIES_LAG.equals(mclType) ? timeseriesTopicName : versionedTopicName;
}
private static Double getMedian(Collection listValues) {
@@ -238,9 +276,4 @@ private static Double getMedian(Collection listValues) {
else median = values[values.length / 2];
return median;
}
-
- public enum MclType {
- TIMESERIES,
- VERSIONED
- }
}
diff --git a/metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/producer/KafkaProducerThrottleTest.java b/metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/throttle/KafkaThrottleSensorTest.java
similarity index 80%
rename from metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/producer/KafkaProducerThrottleTest.java
rename to metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/throttle/KafkaThrottleSensorTest.java
index ce6104ee2ca7dc..6f82ad86852992 100644
--- a/metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/producer/KafkaProducerThrottleTest.java
+++ b/metadata-dao-impl/kafka-producer/src/test/java/com/datahub/metadata/dao/throttle/KafkaThrottleSensorTest.java
@@ -1,4 +1,4 @@
-package com.datahub.metadata.dao.producer;
+package com.datahub.metadata.dao.throttle;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
@@ -14,6 +14,8 @@
import static org.testng.Assert.assertTrue;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
+import com.linkedin.metadata.dao.throttle.ThrottleControl;
+import com.linkedin.metadata.dao.throttle.ThrottleType;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.Topics;
import com.linkedin.util.Pair;
@@ -34,7 +36,7 @@
import org.apache.kafka.common.TopicPartition;
import org.testng.annotations.Test;
-public class KafkaProducerThrottleTest {
+public class KafkaThrottleSensorTest {
private static final List STANDARD_TOPICS =
List.of(Topics.METADATA_CHANGE_LOG_VERSIONED, Topics.METADATA_CHANGE_LOG_TIMESERIES);
private static final String STANDARD_MCL_CONSUMER_GROUP_ID = "generic-mae-consumer-job-client";
@@ -54,16 +56,16 @@ public void testLagCalculation() throws ExecutionException, InterruptedException
topicPart -> ((long) topicPart.partition() + 1) * 2,
3));
- KafkaProducerThrottle test =
- KafkaProducerThrottle.builder()
+ KafkaThrottleSensor test =
+ KafkaThrottleSensor.builder()
.config(noSchedulerConfig().getThrottle())
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
- .pauseConsumer(mock(Consumer.class))
- .build();
+ .build()
+ .addCallback((throttleEvent -> ThrottleControl.NONE));
// Refresh calculations
test.refresh();
@@ -71,8 +73,8 @@ public void testLagCalculation() throws ExecutionException, InterruptedException
assertEquals(
test.getLag(),
Map.of(
- KafkaProducerThrottle.MclType.VERSIONED, 2L,
- KafkaProducerThrottle.MclType.TIMESERIES, 2L));
+ ThrottleType.MCL_VERSIONED_LAG, 2L,
+ ThrottleType.MCL_TIMESERIES_LAG, 2L));
}
@Test
@@ -111,45 +113,52 @@ public void testThrottle() throws ExecutionException, InterruptedException {
Consumer pauseFunction = mock(Consumer.class);
- KafkaProducerThrottle test =
- KafkaProducerThrottle.builder()
+ KafkaThrottleSensor test =
+ KafkaThrottleSensor.builder()
.config(noThrottleConfig)
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
- .pauseConsumer(pauseFunction)
- .build();
+ .build()
+ .addCallback(
+ (throttleEvent -> {
+ pauseFunction.accept(throttleEvent.isThrottled());
+ return ThrottleControl.builder()
+ .callback(
+ throttleResume -> pauseFunction.accept(throttleResume.isThrottled()))
+ .build();
+ }));
// Refresh calculations
test.refresh();
assertEquals(
test.getLag(),
Map.of(
- KafkaProducerThrottle.MclType.VERSIONED, 2L,
- KafkaProducerThrottle.MclType.TIMESERIES, 2L));
+ ThrottleType.MCL_VERSIONED_LAG, 2L,
+ ThrottleType.MCL_TIMESERIES_LAG, 2L));
assertFalse(
- test.isThrottled(KafkaProducerThrottle.MclType.VERSIONED),
+ test.isThrottled(ThrottleType.MCL_VERSIONED_LAG),
"Expected not throttling, lag is below threshold");
- assertFalse(test.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES));
+ assertFalse(test.isThrottled(ThrottleType.MCL_TIMESERIES_LAG));
test.throttle();
verifyNoInteractions(pauseFunction);
reset(pauseFunction);
- KafkaProducerThrottle test2 = test.toBuilder().config(throttleConfig).build();
+ KafkaThrottleSensor test2 = test.toBuilder().config(throttleConfig).build();
// Refresh calculations
test2.refresh();
assertEquals(
test2.getLag(),
Map.of(
- KafkaProducerThrottle.MclType.VERSIONED, 2L,
- KafkaProducerThrottle.MclType.TIMESERIES, 2L));
+ ThrottleType.MCL_VERSIONED_LAG, 2L,
+ ThrottleType.MCL_TIMESERIES_LAG, 2L));
assertTrue(
- test2.isThrottled(KafkaProducerThrottle.MclType.VERSIONED),
+ test2.isThrottled(ThrottleType.MCL_VERSIONED_LAG),
"Expected throttling, lag is above threshold.");
assertFalse(
- test2.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES),
+ test2.isThrottled(ThrottleType.MCL_TIMESERIES_LAG),
"Expected not throttling. Timeseries is disabled");
test2.throttle();
@@ -183,56 +192,48 @@ public void testBackOff() throws ExecutionException, InterruptedException {
topicPart -> ((long) topicPart.partition() + 1) * 2,
3));
- KafkaProducerThrottle test =
- KafkaProducerThrottle.builder()
+ KafkaThrottleSensor test =
+ KafkaThrottleSensor.builder()
.config(throttleConfig)
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
- .pauseConsumer(mock(Consumer.class))
- .build();
+ .build()
+ .addCallback((throttleEvent -> ThrottleControl.NONE));
// Refresh calculations
test.refresh();
assertEquals(
test.getLag(),
Map.of(
- KafkaProducerThrottle.MclType.VERSIONED, 2L,
- KafkaProducerThrottle.MclType.TIMESERIES, 2L));
+ ThrottleType.MCL_VERSIONED_LAG, 2L,
+ ThrottleType.MCL_TIMESERIES_LAG, 2L));
assertTrue(
- test.isThrottled(KafkaProducerThrottle.MclType.VERSIONED),
+ test.isThrottled(ThrottleType.MCL_VERSIONED_LAG),
"Expected throttling, lag is above threshold.");
assertFalse(
- test.isThrottled(KafkaProducerThrottle.MclType.TIMESERIES),
+ test.isThrottled(ThrottleType.MCL_TIMESERIES_LAG),
"Expected no throttling. Timeseries is disabled");
assertEquals(
- test.computeNextBackOff(KafkaProducerThrottle.MclType.TIMESERIES),
+ test.computeNextBackOff(ThrottleType.MCL_TIMESERIES_LAG),
0L,
"Expected no backoff. Timeseries is disabled.");
+ assertEquals(test.computeNextBackOff(ThrottleType.MCL_VERSIONED_LAG), 1L, "Expected initial 1");
assertEquals(
- test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), 1L, "Expected initial 1");
+ test.computeNextBackOff(ThrottleType.MCL_VERSIONED_LAG), 2L, "Expected second 2^1");
+ assertEquals(test.computeNextBackOff(ThrottleType.MCL_VERSIONED_LAG), 4L, "Expected third 2^2");
assertEquals(
- test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
- 2L,
- "Expected second 2^1");
+ test.computeNextBackOff(ThrottleType.MCL_VERSIONED_LAG), 8L, "Expected fourth 2^3");
assertEquals(
- test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED), 4L, "Expected third 2^2");
- assertEquals(
- test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
- 8L,
- "Expected fourth 2^3");
- assertEquals(
- test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
+ test.computeNextBackOff(ThrottleType.MCL_VERSIONED_LAG),
8L,
"Expected fifth max interval at 8");
assertEquals(
- test.computeNextBackOff(KafkaProducerThrottle.MclType.VERSIONED),
- -1L,
- "Expected max attempts");
+ test.computeNextBackOff(ThrottleType.MCL_VERSIONED_LAG), -1L, "Expected max attempts");
}
@Test
@@ -253,16 +254,16 @@ public void testScheduler() throws ExecutionException, InterruptedException {
AdminClient mockAdmin =
mockKafka(generateLag(STANDARD_TOPICS, topicPart -> 1L, topicPart -> 2L, 1));
- KafkaProducerThrottle test =
- KafkaProducerThrottle.builder()
+ KafkaThrottleSensor test =
+ KafkaThrottleSensor.builder()
.config(throttlesConfig)
.kafkaAdmin(mockAdmin)
.versionedTopicName(STANDARD_TOPICS.get(0))
.timeseriesTopicName(STANDARD_TOPICS.get(1))
.entityRegistry(mock(EntityRegistry.class))
.mclConsumerGroupId(STANDARD_MCL_CONSUMER_GROUP_ID)
- .pauseConsumer(mock(Consumer.class))
- .build();
+ .build()
+ .addCallback((throttleEvent -> ThrottleControl.NONE));
try {
test.start();
@@ -270,8 +271,8 @@ public void testScheduler() throws ExecutionException, InterruptedException {
assertEquals(
test.getLag(),
Map.of(
- KafkaProducerThrottle.MclType.VERSIONED, 1L,
- KafkaProducerThrottle.MclType.TIMESERIES, 1L),
+ ThrottleType.MCL_VERSIONED_LAG, 1L,
+ ThrottleType.MCL_TIMESERIES_LAG, 1L),
"Expected lag updated");
} finally {
test.stop();
diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle
index 9f5fc109eea7f6..7e72767c08b79c 100644
--- a/metadata-io/build.gradle
+++ b/metadata-io/build.gradle
@@ -27,6 +27,8 @@ dependencies {
implementation externalDependency.guava
implementation externalDependency.reflections
+ // https://mvnrepository.com/artifact/nl.basjes.parse.useragent/yauaa
+ implementation 'nl.basjes.parse.useragent:yauaa:7.27.0'
api(externalDependency.dgraph4j) {
exclude group: 'com.google.guava', module: 'guava'
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottle.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottle.java
new file mode 100644
index 00000000000000..542eb5f3869c01
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottle.java
@@ -0,0 +1,82 @@
+package com.linkedin.metadata.dao.throttle;
+
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MANUAL;
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_TIMESERIES_LAG;
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_VERSIONED_LAG;
+
+import io.datahubproject.metadata.context.OperationContext;
+import io.datahubproject.metadata.context.RequestContext;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import nl.basjes.parse.useragent.UserAgent;
+import nl.basjes.parse.useragent.UserAgentAnalyzer;
+
+public class APIThrottle {
+ private static final Set AGENT_EXEMPTIONS = Set.of("Browser");
+ private static final UserAgentAnalyzer UAA =
+ UserAgentAnalyzer.newBuilder()
+ .hideMatcherLoadStats()
+ .withField(UserAgent.AGENT_CLASS)
+ .withCache(1000)
+ .build();
+
+ private APIThrottle() {}
+
+ /**
+ * This method is expected to be called on sync ingest requests for both timeseries or versioned
+ * aspects.
+ *
+ *
1. Async requests are never expected to be throttled here. 2. UI requests are not expected
+ * to be throttled, so we'll try to detect browser vs non-browser activity. 3. Throttling
+ * exceptions are expected to be caught by the API implementation and converted to a 429 http
+ * status code
+ *
+ * @param opContext the operation context
+ * @param throttleEvents the throttle state
+ * @param isTimeseries whether the operation is for timeseries or not (throttled separately)
+ */
+ public static void evaluate(
+ @Nonnull OperationContext opContext,
+ @Nullable Set throttleEvents,
+ boolean isTimeseries) {
+
+ Set eventMatchMaxWaitMs = eventMatchMaxWaitMs(throttleEvents, isTimeseries);
+
+ if (!eventMatchMaxWaitMs.isEmpty() && !isExempt(opContext.getRequestContext())) {
+ throw new APIThrottleException(
+ eventMatchMaxWaitMs.stream().max(Comparator.naturalOrder()).orElse(-1L),
+ "Throttled due to " + throttleEvents);
+ }
+ }
+
+ private static boolean isExempt(@Nullable RequestContext requestContext) {
+ // Exclude internal calls
+ if (requestContext == null
+ || requestContext.getUserAgent() == null
+ || requestContext.getUserAgent().isEmpty()) {
+ return true;
+ }
+
+ UserAgent ua = UAA.parse(requestContext.getUserAgent());
+ return AGENT_EXEMPTIONS.contains(ua.get(UserAgent.AGENT_CLASS).getValue());
+ }
+
+ private static Set eventMatchMaxWaitMs(
+ @Nullable Set throttleEvents, boolean isTimeseries) {
+ if (throttleEvents == null) {
+ return Set.of();
+ }
+
+ return throttleEvents.stream()
+ .map(
+ e ->
+ e.getActiveThrottleMaxWaitMs(
+ Set.of(MANUAL, isTimeseries ? MCL_TIMESERIES_LAG : MCL_VERSIONED_LAG)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottleException.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottleException.java
new file mode 100644
index 00000000000000..6f1a5fcd1af220
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottleException.java
@@ -0,0 +1,20 @@
+package com.linkedin.metadata.dao.throttle;
+
+import java.util.concurrent.TimeUnit;
+
+public class APIThrottleException extends RuntimeException {
+ private final long durationMs;
+
+ public APIThrottleException(long durationMs, String message) {
+ super(message);
+ this.durationMs = durationMs;
+ }
+
+ public long getDurationMs() {
+ return durationMs;
+ }
+
+ public long getDurationSeconds() {
+ return TimeUnit.MILLISECONDS.toSeconds(durationMs);
+ }
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/NoOpSensor.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/NoOpSensor.java
new file mode 100644
index 00000000000000..29692ff86d805f
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/NoOpSensor.java
@@ -0,0 +1,12 @@
+package com.linkedin.metadata.dao.throttle;
+
+import java.util.function.Function;
+import lombok.EqualsAndHashCode;
+
+@EqualsAndHashCode
+public class NoOpSensor implements ThrottleSensor {
+ @Override
+ public ThrottleSensor addCallback(Function callback) {
+ return this;
+ }
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleControl.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleControl.java
new file mode 100644
index 00000000000000..b08c43078e79ba
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleControl.java
@@ -0,0 +1,31 @@
+package com.linkedin.metadata.dao.throttle;
+
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Value;
+import lombok.experimental.Accessors;
+
+@Value
+@Accessors(fluent = true)
+@Builder
+public class ThrottleControl {
+ public static ThrottleControl NONE = ThrottleControl.builder().build();
+
+ // call this after pause/sleep
+ @Getter(AccessLevel.NONE)
+ @Nullable
+ Consumer callback;
+
+ public boolean hasCallback() {
+ return callback != null;
+ }
+
+ public void execute(ThrottleEvent throttleEvent) {
+ if (callback != null) {
+ callback.accept(throttleEvent);
+ }
+ }
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleEvent.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleEvent.java
new file mode 100644
index 00000000000000..d382c87d6b546a
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleEvent.java
@@ -0,0 +1,96 @@
+package com.linkedin.metadata.dao.throttle;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import lombok.Builder;
+import lombok.Value;
+import lombok.experimental.Accessors;
+
+@Value
+@Accessors(fluent = true)
+@Builder
+public class ThrottleEvent {
+ public static ThrottleEvent throttle(Map backoffWaitMs) {
+ return ThrottleEvent.builder()
+ .backoffWaitMs(backoffWaitMs)
+ .throttled(
+ backoffWaitMs.entrySet().stream()
+ .filter(entry -> entry.getValue() > 0)
+ .map(entry -> Map.entry(entry.getKey(), true))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
+ .build();
+ }
+
+ public static ThrottleEvent clearThrottle(ThrottleEvent throttleEvent) {
+ return clearThrottle(throttleEvent.getActiveThrottles());
+ }
+
+ public static ThrottleEvent clearThrottle(Set clear) {
+ return ThrottleEvent.builder()
+ .throttled(
+ clear.stream()
+ .map(t -> Map.entry(t, false))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
+ .build();
+ }
+
+ Map throttled;
+ Map backoffWaitMs;
+
+ public Set getActiveThrottles() {
+ return streamTypes().filter(this::isThrottled).collect(Collectors.toSet());
+ }
+
+ /**
+ * Return the suggested wait time in milliseconds given an optional list filter types.
+ *
+ * @param filterTypes empty for no filters
+ * @return suggested wait time in milliseconds, negative if no suggestion is possible, null if no
+ * wait
+ */
+ @Nullable
+ public Long getActiveThrottleMaxWaitMs(Set filterTypes) {
+ Set activeThrottles =
+ getActiveThrottles().stream()
+ .filter(a -> filterTypes.isEmpty() || filterTypes.contains(a))
+ .collect(Collectors.toSet());
+
+ if (activeThrottles.isEmpty()) {
+ return null;
+ }
+
+ if (!activeThrottles.contains(ThrottleType.MANUAL) && backoffWaitMs != null) {
+ return getActiveThrottles().stream()
+ .map(t -> backoffWaitMs.getOrDefault(t, -1L))
+ .max(Comparator.naturalOrder())
+ .orElse(-1L);
+ }
+
+ return -1L;
+ }
+
+ public Set getDisabledThrottles() {
+ return streamTypes().filter(t -> !isThrottled(t)).collect(Collectors.toSet());
+ }
+
+ public boolean isThrottled() {
+ return (throttled != null && throttled.values().stream().anyMatch(b -> b))
+ || (backoffWaitMs != null && backoffWaitMs.values().stream().anyMatch(wait -> wait > 0));
+ }
+
+ private boolean isThrottled(ThrottleType throttleType) {
+ return (throttled != null && throttled.getOrDefault(throttleType, false))
+ || (backoffWaitMs != null && backoffWaitMs.getOrDefault(throttleType, 0L) > 0);
+ }
+
+ private Stream streamTypes() {
+ return Stream.concat(
+ throttled != null ? throttled.keySet().stream() : Stream.empty(),
+ backoffWaitMs != null ? backoffWaitMs.keySet().stream() : Stream.empty())
+ .distinct();
+ }
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleSensor.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleSensor.java
new file mode 100644
index 00000000000000..d92defe5edbcb7
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleSensor.java
@@ -0,0 +1,7 @@
+package com.linkedin.metadata.dao.throttle;
+
+import java.util.function.Function;
+
+public interface ThrottleSensor {
+ ThrottleSensor addCallback(Function callback);
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleType.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleType.java
new file mode 100644
index 00000000000000..ac6d13a58cd079
--- /dev/null
+++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/ThrottleType.java
@@ -0,0 +1,7 @@
+package com.linkedin.metadata.dao.throttle;
+
+public enum ThrottleType {
+ MCL_TIMESERIES_LAG,
+ MCL_VERSIONED_LAG,
+ MANUAL
+}
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
index 4b83ea40f722db..c584b8ac4d7a27 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
@@ -51,6 +51,10 @@
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil;
import com.linkedin.metadata.config.PreProcessHooks;
+import com.linkedin.metadata.dao.throttle.APIThrottle;
+import com.linkedin.metadata.dao.throttle.ThrottleControl;
+import com.linkedin.metadata.dao.throttle.ThrottleEvent;
+import com.linkedin.metadata.dao.throttle.ThrottleType;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
@@ -96,6 +100,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -159,6 +164,9 @@ public class EntityServiceImpl implements EntityService {
private final Integer ebeanMaxTransactionRetry;
private final boolean enableBrowseV2;
+ @Getter
+ private final Map, ThrottleEvent> throttleEvents = new ConcurrentHashMap<>();
+
public EntityServiceImpl(
@Nonnull final AspectDao aspectDao,
@Nonnull final EventProducer producer,
@@ -194,6 +202,17 @@ public void setUpdateIndicesService(@Nullable SearchIndicesService updateIndices
this.updateIndicesService = updateIndicesService;
}
+ public ThrottleControl handleThrottleEvent(ThrottleEvent throttleEvent) {
+ final Set activeEvents = throttleEvent.getActiveThrottles();
+ // store throttle event
+ throttleEvents.put(activeEvents, throttleEvent);
+
+ return ThrottleControl.builder()
+ // clear throttle event
+ .callback(clearThrottle -> throttleEvents.remove(clearThrottle.getDisabledThrottles()))
+ .build();
+ }
+
@Override
public RecordTemplate getLatestAspect(
@Nonnull OperationContext opContext, @Nonnull Urn urn, @Nonnull String aspectName) {
@@ -769,6 +788,9 @@ public List ingestAspects(
return Collections.emptyList();
}
+ // Handle throttling
+ APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), false);
+
List ingestResults =
ingestAspectsToLocalDB(opContext, aspectsBatch, overwrite);
@@ -1183,6 +1205,9 @@ private Stream ingestTimeseriesProposal(
}
if (!async) {
+ // Handle throttling
+ APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), true);
+
// Create default non-timeseries aspects for timeseries aspects
List timeseriesKeyAspects =
aspectsBatch.getMCPItems().stream()
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/dao/throttle/APIThrottleTest.java b/metadata-io/src/test/java/com/linkedin/metadata/dao/throttle/APIThrottleTest.java
new file mode 100644
index 00000000000000..c86d80be2d7fd2
--- /dev/null
+++ b/metadata-io/src/test/java/com/linkedin/metadata/dao/throttle/APIThrottleTest.java
@@ -0,0 +1,162 @@
+package com.linkedin.metadata.dao.throttle;
+
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MANUAL;
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_TIMESERIES_LAG;
+import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_VERSIONED_LAG;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.datahubproject.metadata.context.OperationContext;
+import io.datahubproject.metadata.context.RequestContext;
+import io.datahubproject.test.metadata.context.TestOperationContexts;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class APIThrottleTest {
+ private static final ThrottleEvent MANUAL_THROTTLED_EVENT =
+ ThrottleEvent.builder().throttled(Map.of(MANUAL, true)).build();
+ private static final ThrottleEvent MCL_TIMESERIES_THROTTLED_EVENT =
+ ThrottleEvent.builder().throttled(Map.of(MCL_TIMESERIES_LAG, true)).build();
+ private static final ThrottleEvent MCL_VERSIONED_THROTTLED_EVENT =
+ ThrottleEvent.builder().throttled(Map.of(MCL_VERSIONED_LAG, true)).build();
+ private static final ThrottleEvent ALL_MCL_THROTTLED_EVENT =
+ ThrottleEvent.builder()
+ .throttled(Map.of(MCL_TIMESERIES_LAG, true, MCL_VERSIONED_LAG, true))
+ .build();
+ private static final ThrottleEvent ALL_THROTTLED_EVENT =
+ ThrottleEvent.builder()
+ .throttled(Map.of(MANUAL, true, MCL_TIMESERIES_LAG, true, MCL_VERSIONED_LAG, true))
+ .build();
+ public static final Set ALL_EVENTS =
+ Set.of(
+ MANUAL_THROTTLED_EVENT,
+ MCL_TIMESERIES_THROTTLED_EVENT,
+ MCL_VERSIONED_THROTTLED_EVENT,
+ ALL_MCL_THROTTLED_EVENT,
+ ALL_THROTTLED_EVENT);
+
+ private OperationContext opContext;
+ private RequestContext mockRequestContext;
+
+ @BeforeMethod
+ public void init() {
+ mockRequestContext = mock(RequestContext.class);
+ opContext = TestOperationContexts.userContextNoSearchAuthorization(mockRequestContext);
+ }
+
+ @Test
+ public void testExemptions() {
+ List exemptions =
+ List.of(
+ "",
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36",
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:129.0) Gecko/20100101 Firefox/129.0",
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0",
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.6 Safari/605.1.15");
+
+ for (ThrottleEvent event : ALL_EVENTS) {
+ when(mockRequestContext.getUserAgent()).thenReturn(null);
+ try {
+ APIThrottle.evaluate(opContext, Set.of(event), false);
+ } catch (Exception ex) {
+ Assert.fail("Exception was thrown and NOT expected! " + event);
+ }
+ try {
+ APIThrottle.evaluate(opContext, Set.of(event), true);
+ } catch (Exception ex) {
+ Assert.fail("Exception was thrown and NOT expected! " + event);
+ }
+
+ // Browser tests
+ for (String ua : exemptions) {
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), true);
+ } catch (Exception ex) {
+ Assert.fail("Exception was thrown and NOT expected! " + event);
+ }
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), false);
+ } catch (Exception ex) {
+ Assert.fail("Exception was thrown and NOT expected! " + event);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testThrottleException() {
+ List applicable =
+ List.of(
+ "python-requests/2.28.2",
+ "Apache-HttpClient/4.5.5 (Java/1.8.0_162)",
+ "okhttp/4.9.3.7",
+ "Go-http-client/1.1");
+
+ for (ThrottleEvent event : ALL_EVENTS) {
+ for (String ua : applicable) {
+ // timeseries lag present
+ if (event.getActiveThrottles().contains(MCL_TIMESERIES_LAG)
+ && !event.getActiveThrottles().contains(MANUAL)) {
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), true);
+ Assert.fail(String.format("Exception WAS expected! %s %s", ua, event));
+ } catch (Exception ignored) {
+ }
+ }
+ if (!event.getActiveThrottles().contains(MCL_TIMESERIES_LAG)
+ && !event.getActiveThrottles().contains(MANUAL)) {
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), true);
+ } catch (Exception ex) {
+ Assert.fail(String.format("Exception was thrown and NOT expected! %s %s", ua, event));
+ }
+ }
+
+ // versioned lag present
+ if (event.getActiveThrottles().contains(MCL_VERSIONED_LAG)
+ && !event.getActiveThrottles().contains(MANUAL)) {
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), false);
+ Assert.fail(String.format("Exception WAS expected! %s %s", ua, event));
+ } catch (Exception ignored) {
+ }
+ }
+ if (!event.getActiveThrottles().contains(MCL_VERSIONED_LAG)
+ && !event.getActiveThrottles().contains(MANUAL)) {
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), false);
+ } catch (Exception ex) {
+ Assert.fail(String.format("Exception was thrown and NOT expected! %s %s", ua, event));
+ }
+ }
+
+ // manual throttle active
+ if (event.getActiveThrottles().contains(MANUAL)) {
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), true);
+ Assert.fail(String.format("Exception WAS expected! %s %s", ua, event));
+ } catch (Exception ignored) {
+ }
+ try {
+ when(mockRequestContext.getUserAgent()).thenReturn(ua);
+ APIThrottle.evaluate(opContext, Set.of(event), false);
+ Assert.fail(String.format("Exception WAS expected! %s %s", ua, event));
+ } catch (Exception ignored) {
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java
index 60d9c7496dfcb5..2f3f35697e476c 100644
--- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java
+++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java
@@ -4,8 +4,11 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.entity.client.SystemEntityClient;
+import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory;
import com.linkedin.metadata.EventUtils;
+import com.linkedin.metadata.dao.throttle.ThrottleControl;
+import com.linkedin.metadata.dao.throttle.ThrottleSensor;
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.FailedMetadataChangeProposal;
@@ -13,7 +16,9 @@
import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
+import java.util.Optional;
import javax.annotation.Nonnull;
+import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
@@ -22,11 +27,14 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@@ -36,11 +44,19 @@
@EnableKafka
@RequiredArgsConstructor
public class MetadataChangeProposalsProcessor {
+ private static final String CONSUMER_GROUP_ID_VALUE =
+ "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}";
private final OperationContext systemOperationContext;
private final SystemEntityClient entityClient;
private final Producer kafkaProducer;
+ @Qualifier("kafkaThrottle")
+ private final ThrottleSensor kafkaThrottle;
+
+ private final KafkaListenerEndpointRegistry registry;
+ private final ConfigurationProvider provider;
+
private final Histogram kafkaLagStats =
MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));
@@ -50,8 +66,47 @@ public class MetadataChangeProposalsProcessor {
+ "}")
private String fmcpTopicName;
+ @Value(CONSUMER_GROUP_ID_VALUE)
+ private String mceConsumerGroupId;
+
+ @PostConstruct
+ public void registerConsumerThrottle() {
+ if (kafkaThrottle != null
+ && provider
+ .getMetadataChangeProposal()
+ .getThrottle()
+ .getComponents()
+ .getMceConsumer()
+ .isEnabled()) {
+ log.info("MCE Consumer Throttle Enabled");
+ kafkaThrottle.addCallback(
+ (throttleEvent) -> {
+ Optional container =
+ Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId));
+ if (container.isEmpty()) {
+ log.warn(
+ "Expected container was missing: {} throttle is not possible.",
+ mceConsumerGroupId);
+ } else {
+ if (throttleEvent.isThrottled()) {
+ container.ifPresent(MessageListenerContainer::pause);
+ return ThrottleControl.builder()
+ // resume consumer after sleep
+ .callback(
+ (resumeEvent) -> container.ifPresent(MessageListenerContainer::resume))
+ .build();
+ }
+ }
+
+ return ThrottleControl.NONE;
+ });
+ } else {
+ log.info("MCE Consumer Throttle Disabled");
+ }
+ }
+
@KafkaListener(
- id = "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}",
+ id = CONSUMER_GROUP_ID_VALUE,
topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}",
containerFactory = "kafkaEventConsumer")
public void consume(final ConsumerRecord consumerRecord) {
diff --git a/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java b/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java
index 76f58fb4751085..cdcbb540eeda43 100644
--- a/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java
+++ b/metadata-operation-context/src/main/java/io/datahubproject/test/metadata/context/TestOperationContexts.java
@@ -259,6 +259,12 @@ public static OperationContext userContextNoSearchAuthorization(
.asSession(RequestContext.TEST, authorizer, sessionAuthorization);
}
+ public static OperationContext userContextNoSearchAuthorization(
+ @Nonnull RequestContext requestContext) {
+ return systemContextNoSearchAuthorization(defaultEntityRegistry())
+ .asSession(requestContext, Authorizer.EMPTY, TEST_USER_AUTH);
+ }
+
@Builder
public static class EmptyAspectRetriever implements AspectRetriever {
private final Supplier entityRegistrySupplier;
diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java
index f988758beee363..4e8c18912c40ea 100644
--- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java
+++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/MetadataChangeProposalConfig.java
@@ -14,10 +14,30 @@ public class MetadataChangeProposalConfig {
@Accessors(chain = true)
public static class ThrottlesConfig {
Integer updateIntervalMs;
+ ComponentsThrottleConfig components;
ThrottleConfig versioned;
ThrottleConfig timeseries;
}
+ @Data
+ @Accessors(chain = true)
+ public static class ComponentsThrottleConfig {
+ MceConsumerThrottleConfig mceConsumer;
+ ApiRequestsThrottleConfig apiRequests;
+ }
+
+ @Data
+ @Accessors(chain = true)
+ public static class MceConsumerThrottleConfig {
+ boolean enabled;
+ }
+
+ @Data
+ @Accessors(chain = true)
+ public static class ApiRequestsThrottleConfig {
+ boolean enabled;
+ }
+
@Data
@Accessors(chain = true)
public static class ThrottleConfig {
diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml
index 0977c64d0e8609..18944e6cc7ba63 100644
--- a/metadata-service/configuration/src/main/resources/application.yaml
+++ b/metadata-service/configuration/src/main/resources/application.yaml
@@ -523,18 +523,27 @@ metadataChangeProposal:
throttle:
updateIntervalMs: ${MCP_THROTTLE_UPDATE_INTERVAL_MS:60000}
- # Versioned MCL topic
+ # What component is throttled
+ components:
+ mceConsumer:
+ enabled: ${MCP_MCE_CONSUMER_THROTTLE_ENABLED:false}
+ apiRequests:
+ enabled: ${MCP_API_REQUESTS_THROTTLE_ENABLED:false}
+
+ # How is it throttled
+ # Versioned MCL topic settings
versioned:
- # Whether to throttle MCP processing based on MCL backlog
+ # Whether to monitor MCL versioned backlog
enabled: ${MCP_VERSIONED_THROTTLE_ENABLED:false}
threshold: ${MCP_VERSIONED_THRESHOLD:4000} # throttle threshold
maxAttempts: ${MCP_VERSIONED_MAX_ATTEMPTS:1000}
initialIntervalMs: ${MCP_VERSIONED_INITIAL_INTERVAL_MS:100}
multiplier: ${MCP_VERSIONED_MULTIPLIER:10}
maxIntervalMs: ${MCP_VERSIONED_MAX_INTERVAL_MS:30000}
- # Timeseries MCL topic
+
+ # Timeseries MCL topic settings
timeseries:
- # Whether to throttle MCP processing based on MCL backlog
+ # Whether to monitor MCL timeseries backlog
enabled: ${MCP_TIMESERIES_THROTTLE_ENABLED:false}
threshold: ${MCP_TIMESERIES_THRESHOLD:4000} # throttle threshold
maxAttempts: ${MCP_TIMESERIES_MAX_ATTEMPTS:1000}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java
index 185e1e3ae624c4..383716a80cc60a 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/CacheConfig.java
@@ -4,12 +4,18 @@
import com.hazelcast.config.Config;
import com.hazelcast.config.EvictionConfig;
import com.hazelcast.config.EvictionPolicy;
+import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizePolicy;
+import com.hazelcast.config.MergePolicyConfig;
+import com.hazelcast.config.ReplicatedMapConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.spi.merge.LatestUpdateMergePolicy;
import com.hazelcast.spring.cache.HazelcastCacheManager;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
@@ -19,6 +25,7 @@
@Configuration
public class CacheConfig {
+ public static final String THROTTLE_MAP = "distributedThrottle";
@Value("${cache.primary.ttlSeconds:600}")
private int cacheTtlSeconds;
@@ -45,23 +52,15 @@ private Caffeine