diff --git a/build.gradle b/build.gradle index 22217f2149c5e7..a425e1a38aa479 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,7 @@ buildscript { ext.junitJupiterVersion = '5.6.1' - ext.pegasusVersion = '29.22.16' + // Releases: https://github.com/linkedin/rest.li/blob/master/CHANGELOG.md + ext.pegasusVersion = '29.40.14' ext.mavenVersion = '3.6.3' ext.springVersion = '5.3.20' ext.springBootVersion = '2.5.12' diff --git a/datahub-frontend/play.gradle b/datahub-frontend/play.gradle index 7dfb9ddc6262a1..453bbc5802dd48 100644 --- a/datahub-frontend/play.gradle +++ b/datahub-frontend/play.gradle @@ -64,6 +64,7 @@ dependencies { testImplementation externalDependency.mockito testImplementation externalDependency.playTest + testImplementation 'org.awaitility:awaitility:4.2.0' testImplementation 'no.nav.security:mock-oauth2-server:0.3.1' testImplementation 'org.junit-pioneer:junit-pioneer:1.9.1' testImplementation externalDependency.junitJupiterApi diff --git a/datahub-frontend/test/app/ApplicationTest.java b/datahub-frontend/test/app/ApplicationTest.java index 58a602532fc72c..8a58e733a69641 100644 --- a/datahub-frontend/test/app/ApplicationTest.java +++ b/datahub-frontend/test/app/ApplicationTest.java @@ -5,6 +5,8 @@ import no.nav.security.mock.oauth2.token.DefaultOAuth2TokenCallback; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; +import org.awaitility.Awaitility; +import org.awaitility.Durations; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -93,7 +95,8 @@ public void init() throws IOException, InterruptedException { startServer(); createBrowser(); - Thread.sleep(5000); + + Awaitility.await().timeout(Durations.TEN_SECONDS).until(() -> app != null); } @AfterAll diff --git a/docker/datahub-mce-consumer/Dockerfile b/docker/datahub-mce-consumer/Dockerfile index eb1da2236c7827..5b2cb40555cce0 100644 --- a/docker/datahub-mce-consumer/Dockerfile +++ b/docker/datahub-mce-consumer/Dockerfile @@ -28,11 +28,12 @@ RUN apk --no-cache --update-cache --available upgrade \ && apk --no-cache add openjdk8 openjdk11 perl COPY . datahub-src -RUN cd datahub-src && ./gradlew :metadata-jobs:mce-consumer-job:build +RUN cd datahub-src && ./gradlew :metadata-jobs:mce-consumer-job:build --stacktrace RUN cd datahub-src && cp metadata-jobs/mce-consumer-job/build/libs/mce-consumer-job.jar ../mce-consumer-job.jar FROM base as prod-install COPY --from=prod-build /mce-consumer-job.jar /datahub/datahub-mce-consumer/bin/ +COPY --from=prod-build /datahub-src/metadata-models/src/main/resources/entity-registry.yml /datahub/datahub-mce-consumer/resources/entity-registry.yml COPY --from=prod-build /datahub-src/docker/datahub-mce-consumer/start.sh /datahub/datahub-mce-consumer/scripts/ COPY --from=prod-build /datahub-src/docker/monitoring/client-prometheus-config.yaml /datahub/datahub-mce-consumer/scripts/prometheus-config.yaml RUN chmod +x /datahub/datahub-mce-consumer/scripts/start.sh diff --git a/docker/datahub-mce-consumer/env/docker.env b/docker/datahub-mce-consumer/env/docker.env index 95d5dd2c789d1c..c5f2366cd49394 100644 --- a/docker/datahub-mce-consumer/env/docker.env +++ b/docker/datahub-mce-consumer/env/docker.env @@ -1,8 +1,18 @@ MCE_CONSUMER_ENABLED=true +EBEAN_DATASOURCE_USERNAME=datahub +EBEAN_DATASOURCE_PASSWORD=datahub +EBEAN_DATASOURCE_HOST=mysql:3306 +EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8 +EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver KAFKA_BOOTSTRAP_SERVER=broker:29092 KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 -DATAHUB_GMS_HOST=datahub-gms -DATAHUB_GMS_PORT=8080 +ELASTICSEARCH_HOST=elasticsearch +ELASTICSEARCH_PORT=9200 +ES_BULK_REFRESH_POLICY=WAIT_UNTIL +GRAPH_SERVICE_DIFF_MODE_ENABLED=true +GRAPH_SERVICE_IMPL=elasticsearch +JAVA_OPTS=-Xms1g -Xmx1g +ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml # Uncomment to configure kafka topic names # Make sure these names are consistent across the whole deployment @@ -12,8 +22,18 @@ DATAHUB_GMS_PORT=8080 # METADATA_CHANGE_EVENT_NAME=MetadataChangeEvent_v4 # FAILED_METADATA_CHANGE_EVENT_NAME=FailedMetadataChangeEvent_v4 -# Uncomment and set these to support SSL connection to GMS -# NOTE: Currently GMS itself does not offer SSL support, these settings are intended for when there is a proxy in front -# of GMS that handles SSL, such as an EC2 Load Balancer. -#GMS_USE_SSL=true -#GMS_SSL_PROTOCOL= \ No newline at end of file +# Uncomment and set these to support SSL connection to Elasticsearch +# ELASTICSEARCH_USE_SSL=true +# ELASTICSEARCH_SSL_PROTOCOL=TLSv1.2 +# ELASTICSEARCH_SSL_SECURE_RANDOM_IMPL= +# ELASTICSEARCH_SSL_TRUSTSTORE_FILE= +# ELASTICSEARCH_SSL_TRUSTSTORE_TYPE= +# ELASTICSEARCH_SSL_TRUSTSTORE_PASSWORD= +# ELASTICSEARCH_SSL_KEYSTORE_FILE= +# ELASTICSEARCH_SSL_KEYSTORE_TYPE= +# ELASTICSEARCH_SSL_KEYSTORE_PASSWORD= + +# To use simple username/password authentication to Elasticsearch over HTTPS +# set ELASTICSEARCH_USE_SSL=true and uncomment: +# ELASTICSEARCH_USERNAME= +# ELASTICSEARCH_PASSWORD= diff --git a/docker/docker-compose.consumers-without-neo4j.yml b/docker/docker-compose.consumers-without-neo4j.yml index 6830916e442042..7521e2ced618ae 100644 --- a/docker/docker-compose.consumers-without-neo4j.yml +++ b/docker/docker-compose.consumers-without-neo4j.yml @@ -32,4 +32,5 @@ services: - "9090:9090" depends_on: - kafka-setup - - datahub-gms + - elasticsearch-setup + - mysql-setup diff --git a/docker/docker-compose.consumers.yml b/docker/docker-compose.consumers.yml index 0eac8ea920b51d..fbe3a10d5018f5 100644 --- a/docker/docker-compose.consumers.yml +++ b/docker/docker-compose.consumers.yml @@ -33,4 +33,5 @@ services: - "9090:9090" depends_on: - kafka-setup - - datahub-gms + - elasticsearch-setup + - mysql-setup diff --git a/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml index 26bd6c190db80e..f08fe82280b38c 100644 --- a/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose.consumers-without-neo4j.quickstart.yml @@ -28,13 +28,30 @@ services: container_name: datahub-mce-consumer depends_on: - kafka-setup - - datahub-gms + - elasticsearch-setup + - mysql-setup environment: - MCE_CONSUMER_ENABLED=true - KAFKA_BOOTSTRAP_SERVER=broker:29092 - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 - - DATAHUB_GMS_HOST=datahub-gms - - DATAHUB_GMS_PORT=8080 + - DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-quickstart} + - DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true} + - EBEAN_DATASOURCE_USERNAME=datahub + - EBEAN_DATASOURCE_PASSWORD=datahub + - EBEAN_DATASOURCE_HOST=mysql:3306 + - EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8 + - EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver + - ELASTICSEARCH_HOST=elasticsearch + - ELASTICSEARCH_PORT=9200 + - ES_BULK_REFRESH_POLICY=WAIT_UNTIL + - GRAPH_SERVICE_DIFF_MODE_ENABLED=true + - GRAPH_SERVICE_IMPL=elasticsearch + - JAVA_OPTS=-Xms1g -Xmx1g + - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml + - ENTITY_SERVICE_ENABLE_RETENTION=true + - MAE_CONSUMER_ENABLED=false + - PE_CONSUMER_ENABLED=false + - UI_INGESTION_ENABLED=false hostname: datahub-mce-consumer image: ${DATAHUB_MCE_CONSUMER_IMAGE:-linkedin/datahub-mce-consumer}:${DATAHUB_VERSION:-head} ports: diff --git a/docker/quickstart/docker-compose.consumers.quickstart.yml b/docker/quickstart/docker-compose.consumers.quickstart.yml index f77112621201c4..e9ae59148f784f 100644 --- a/docker/quickstart/docker-compose.consumers.quickstart.yml +++ b/docker/quickstart/docker-compose.consumers.quickstart.yml @@ -33,13 +33,34 @@ services: container_name: datahub-mce-consumer depends_on: - kafka-setup - - datahub-gms + - elasticsearch-setup + - mysql-setup environment: - MCE_CONSUMER_ENABLED=true - KAFKA_BOOTSTRAP_SERVER=broker:29092 - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 - - DATAHUB_GMS_HOST=datahub-gms - - DATAHUB_GMS_PORT=8080 + - DATAHUB_SERVER_TYPE=${DATAHUB_SERVER_TYPE:-quickstart} + - DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED:-true} + - EBEAN_DATASOURCE_USERNAME=datahub + - EBEAN_DATASOURCE_PASSWORD=datahub + - EBEAN_DATASOURCE_HOST=mysql:3306 + - EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8&enabledTLSProtocols=TLSv1.2 + - EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver + - ELASTICSEARCH_HOST=elasticsearch + - ELASTICSEARCH_PORT=9200 + - ES_BULK_REFRESH_POLICY=WAIT_UNTIL + - NEO4J_HOST=http://neo4j:7474 + - NEO4J_URI=bolt://neo4j + - NEO4J_USERNAME=neo4j + - NEO4J_PASSWORD=datahub + - JAVA_OPTS=-Xms1g -Xmx1g + - GRAPH_SERVICE_DIFF_MODE_ENABLED=true + - GRAPH_SERVICE_IMPL=neo4j + - ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml + - ENTITY_SERVICE_ENABLE_RETENTION=true + - MAE_CONSUMER_ENABLED=false + - PE_CONSUMER_ENABLED=false + - UI_INGESTION_ENABLED=false hostname: datahub-mce-consumer image: ${DATAHUB_MCE_CONSUMER_IMAGE:-linkedin/datahub-mce-consumer}:${DATAHUB_VERSION:-head} ports: diff --git a/metadata-auth/auth-api/build.gradle b/metadata-auth/auth-api/build.gradle index b51f630beaad36..f82f488b6f182a 100644 --- a/metadata-auth/auth-api/build.gradle +++ b/metadata-auth/auth-api/build.gradle @@ -14,6 +14,10 @@ test { useJUnit() } +jar { + archiveName = "$project.name-lib.jar" +} + shadowJar { zip64 true classifier = null diff --git a/metadata-jobs/mce-consumer-job/build.gradle b/metadata-jobs/mce-consumer-job/build.gradle index bd0b4b192ca6c8..6c806acac9e608 100644 --- a/metadata-jobs/mce-consumer-job/build.gradle +++ b/metadata-jobs/mce-consumer-job/build.gradle @@ -4,14 +4,34 @@ plugins { } dependencies { - compile project(':metadata-jobs:mce-consumer') - compile(externalDependency.springBootStarterWeb) { + implementation project(':metadata-service:factories') + implementation project(':metadata-jobs:mce-consumer') + implementation project(':entity-registry') + + implementation(externalDependency.springBootStarterWeb) { exclude module: "spring-boot-starter-tomcat" } - compile externalDependency.springBootStarterJetty - compile externalDependency.springKafka + implementation externalDependency.springBootStarterJetty + implementation externalDependency.springKafka + implementation spec.product.pegasus.restliDocgen + implementation spec.product.pegasus.restliSpringBridge + implementation externalDependency.slf4jApi + implementation externalDependency.log4j2Api + compileOnly externalDependency.lombok + implementation externalDependency.logbackClassic + + runtime externalDependency.mariadbConnector + runtime externalDependency.mysqlConnector + runtime externalDependency.postgresql + + annotationProcessor externalDependency.lombok + + testImplementation externalDependency.springBootTest + testCompile externalDependency.mockito + testCompile externalDependency.testng } bootJar { mainClassName = 'com.linkedin.metadata.kafka.MceConsumerApplication' + requiresUnpack '**/restli-servlet-impl.jar' } \ No newline at end of file diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java index 840abedc20e2ad..c853fabd16b0a6 100644 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.kafka; +import com.linkedin.gms.factory.entity.RestliEntityClientFactory; +import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.actuate.autoconfigure.solr.SolrHealthContributorAutoConfiguration; @@ -8,13 +10,32 @@ import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; +import org.springframework.context.annotation.PropertySource; @SuppressWarnings("checkstyle:HideUtilityClassConstructor") -@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class, CassandraAutoConfiguration.class, - SolrHealthContributorAutoConfiguration.class}) -@ComponentScan(excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)}) +@SpringBootApplication(exclude = { + ElasticsearchRestClientAutoConfiguration.class, + CassandraAutoConfiguration.class, + SolrHealthContributorAutoConfiguration.class +}) +@ComponentScan(basePackages = { + "com.linkedin.gms.factory.common", + "com.linkedin.gms.factory.config", + "com.linkedin.gms.factory.entity", + "com.linkedin.gms.factory.entityregistry", + "com.linkedin.gms.factory.kafka", + "com.linkedin.gms.factory.search", + "com.linkedin.gms.factory.timeseries", + "com.linkedin.restli.server", + "com.linkedin.metadata.restli" +}, excludeFilters = { + @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { + ScheduledAnalyticsFactory.class, + RestliEntityClientFactory.class + }) +}) +@PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) public class MceConsumerApplication { public static void main(String[] args) { diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/EbeanServerConfig.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/EbeanServerConfig.java new file mode 100644 index 00000000000000..abd73d03a7b55a --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/EbeanServerConfig.java @@ -0,0 +1,74 @@ +package com.linkedin.metadata.restli; + +import io.ebean.datasource.DataSourceConfig; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +import java.util.HashMap; +import java.util.Map; + +import static com.linkedin.gms.factory.common.LocalEbeanServerConfigFactory.getListenerToTrackCounts; + +@Configuration +public class EbeanServerConfig { + @Value("${ebean.username}") + private String ebeanDatasourceUsername; + + @Value("${ebean.password}") + private String ebeanDatasourcePassword; + + @Value("${ebean.driver}") + private String ebeanDatasourceDriver; + + @Value("${ebean.minConnections:1}") + private Integer ebeanMinConnections; + + @Value("${ebean.maxInactiveTimeSeconds:120}") + private Integer ebeanMaxInactiveTimeSecs; + + @Value("${ebean.maxAgeMinutes:120}") + private Integer ebeanMaxAgeMinutes; + + @Value("${ebean.leakTimeMinutes:15}") + private Integer ebeanLeakTimeMinutes; + + @Value("${ebean.waitTimeoutMillis:1000}") + private Integer ebeanWaitTimeoutMillis; + + @Value("${ebean.autoCreateDdl:false}") + private Boolean ebeanAutoCreate; + + @Value("${ebean.postgresUseIamAuth:false}") + private Boolean postgresUseIamAuth; + + + @Bean("ebeanDataSourceConfig") + @Primary + public DataSourceConfig buildDataSourceConfig( + @Value("${ebean.url}") String dataSourceUrl, + @Qualifier("parseqEngineThreads") int ebeanMaxConnections + ) { + DataSourceConfig dataSourceConfig = new DataSourceConfig(); + dataSourceConfig.setUsername(ebeanDatasourceUsername); + dataSourceConfig.setPassword(ebeanDatasourcePassword); + dataSourceConfig.setUrl(dataSourceUrl); + dataSourceConfig.setDriver(ebeanDatasourceDriver); + dataSourceConfig.setMinConnections(ebeanMinConnections); + dataSourceConfig.setMaxConnections(ebeanMaxConnections); + dataSourceConfig.setMaxInactiveTimeSecs(ebeanMaxInactiveTimeSecs); + dataSourceConfig.setMaxAgeMinutes(ebeanMaxAgeMinutes); + dataSourceConfig.setLeakTimeMinutes(ebeanLeakTimeMinutes); + dataSourceConfig.setWaitTimeoutMillis(ebeanWaitTimeoutMillis); + dataSourceConfig.setListener(getListenerToTrackCounts("mce-consumer")); + // Adding IAM auth access for AWS Postgres + if (postgresUseIamAuth) { + Map custom = new HashMap<>(); + custom.put("wrapperPlugins", "iam"); + dataSourceConfig.setCustomProperties(custom); + } + return dataSourceConfig; + } +} diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/RestliServletConfig.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/RestliServletConfig.java new file mode 100644 index 00000000000000..3641fe692b449b --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/RestliServletConfig.java @@ -0,0 +1,38 @@ +package com.linkedin.metadata.restli; + +import com.linkedin.entity.client.RestliEntityClient; +import com.linkedin.restli.client.Client; +import com.linkedin.restli.server.RestliHandlerServlet; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.web.servlet.ServletRegistrationBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +import java.net.URI; + +@Configuration +public class RestliServletConfig { + @Value("${server.port}") + private int configuredPort; + + @Bean("restliEntityClient") + @Primary + public RestliEntityClient restliEntityClient() { + String selfUri = String.format("http://localhost:%s/gms/", configuredPort); + final Client restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(selfUri), null); + return new RestliEntityClient(restClient); + } + + @Bean + public ServletRegistrationBean servletRegistrationBean( + @Qualifier("restliHandlerServlet") RestliHandlerServlet servlet) { + return new ServletRegistrationBean<>(servlet, "/gms/*"); + } + + @Bean + public RestliHandlerServlet restliHandlerServlet() { + return new RestliHandlerServlet(); + } +} diff --git a/metadata-jobs/mce-consumer-job/src/main/resources/application.properties b/metadata-jobs/mce-consumer-job/src/main/resources/application.properties index 32348239b17aee..ff08c03f9a2e87 100644 --- a/metadata-jobs/mce-consumer-job/src/main/resources/application.properties +++ b/metadata-jobs/mce-consumer-job/src/main/resources/application.properties @@ -1,5 +1,12 @@ server.port=9090 +datahub.gms.uri=http://localhost:9090/gms/ management.endpoints.web.exposure.include=metrics, health, info spring.mvc.servlet.path=/ management.health.elasticsearch.enabled=false management.health.neo4j.enabled=false +ingestion.enabled=false +authentication.enabled=false +spring.main.allow-bean-definition-overriding=true +#--- +spring.config.activate.on-profile=test +server.port=0 diff --git a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTest.java b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTest.java new file mode 100644 index 00000000000000..059c4ac2630647 --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTest.java @@ -0,0 +1,38 @@ +package com.linkedin.metadata.kafka; + +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.assertTrue; + +@ActiveProfiles("test") +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + classes = {MceConsumerApplication.class, MceConsumerApplicationTestConfiguration.class}) +public class MceConsumerApplicationTest extends AbstractTestNGSpringContextTests { + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private EntityService mockEntityService; + + @Test + public void testRestliServletConfig() { + RestoreIndicesResult mockResult = new RestoreIndicesResult(); + mockResult.setRowsMigrated(100); + when(mockEntityService.restoreIndices(any(), any())).thenReturn(mockResult); + + String response = this.restTemplate + .postForObject("/gms/aspects?action=restoreIndices", "{\"urn\":\"\"}", String.class); + assertTrue(response.contains(mockResult.toString())); + } +} diff --git a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java new file mode 100644 index 00000000000000..8af20c7887949e --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java @@ -0,0 +1,69 @@ +package com.linkedin.metadata.kafka; + +import com.linkedin.entity.client.RestliEntityClient; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.SiblingGraphService; +import com.linkedin.metadata.models.registry.ConfigEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.restli.DefaultRestliClientFactory; +import com.linkedin.metadata.timeseries.elastic.ElasticSearchTimeseriesAspectService; +import com.linkedin.restli.client.Client; +import io.ebean.EbeanServer; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +import java.net.URI; + +@TestConfiguration +public class MceConsumerApplicationTestConfiguration { + + @Autowired + private TestRestTemplate restTemplate; + + @Bean("entityService") + @Primary + public EntityService entityService() { + return Mockito.mock(EntityService.class); + } + + @Bean("restliEntityClient") + @Primary + public RestliEntityClient restliEntityClient() { + String selfUri = restTemplate.getRootUri(); + final Client restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(selfUri), null); + return new RestliEntityClient(restClient); + } + + @Bean + @Primary + public EbeanServer ebeanServer() { + return Mockito.mock(EbeanServer.class); + } + + @Bean(name = "elasticSearchTimeseriesAspectService") + @Primary + protected ElasticSearchTimeseriesAspectService elasticSearchTimeseriesAspectService() { + return Mockito.mock(ElasticSearchTimeseriesAspectService.class); + } + + @Bean("entityRegistry") + @Primary + protected EntityRegistry entityRegistry() { + return Mockito.mock(EntityRegistry.class); + } + + @Bean("configEntityRegistry") + protected ConfigEntityRegistry configEntityRegistry() { + return Mockito.mock(ConfigEntityRegistry.class); + } + + @Bean("siblingGraphService") + @Primary + protected SiblingGraphService siblingGraphService() { + return Mockito.mock(SiblingGraphService.class); + } +} diff --git a/metadata-service/factories/build.gradle b/metadata-service/factories/build.gradle index c4417c9232ce6b..82965f9b12542c 100644 --- a/metadata-service/factories/build.gradle +++ b/metadata-service/factories/build.gradle @@ -33,6 +33,7 @@ dependencies { annotationProcessor externalDependency.lombok compile spec.product.pegasus.restliSpringBridge + implementation spec.product.pegasus.restliDocgen testImplementation externalDependency.springBootTest diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/LocalEbeanServerConfigFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/LocalEbeanServerConfigFactory.java index 66d917b444e01c..5ab5b14160e279 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/LocalEbeanServerConfigFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/LocalEbeanServerConfigFactory.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -26,9 +27,6 @@ public class LocalEbeanServerConfigFactory { @Value("${ebean.password}") private String ebeanDatasourcePassword; - @Value("${ebean.url}") - private String ebeanDatasourceUrl; - @Value("${ebean.driver}") private String ebeanDatasourceDriver; @@ -56,7 +54,7 @@ public class LocalEbeanServerConfigFactory { @Value("${ebean.postgresUseIamAuth:false}") private Boolean postgresUseIamAuth; - private DataSourcePoolListener getListenerToTrackCounts(String metricName) { + public static DataSourcePoolListener getListenerToTrackCounts(String metricName) { final String counterName = "ebeans_connection_pool_size_" + metricName; return new DataSourcePoolListener() { @Override @@ -71,7 +69,8 @@ public void onBeforeReturnConnection(Connection connection) { }; } - private DataSourceConfig buildDataSourceConfig(String dataSourceUrl, String dataSourceType) { + @Bean("ebeanDataSourceConfig") + public DataSourceConfig buildDataSourceConfig(@Value("${ebean.url}") String dataSourceUrl) { DataSourceConfig dataSourceConfig = new DataSourceConfig(); dataSourceConfig.setUsername(ebeanDatasourceUsername); dataSourceConfig.setPassword(ebeanDatasourcePassword); @@ -83,7 +82,7 @@ private DataSourceConfig buildDataSourceConfig(String dataSourceUrl, String data dataSourceConfig.setMaxAgeMinutes(ebeanMaxAgeMinutes); dataSourceConfig.setLeakTimeMinutes(ebeanLeakTimeMinutes); dataSourceConfig.setWaitTimeoutMillis(ebeanWaitTimeoutMillis); - dataSourceConfig.setListener(getListenerToTrackCounts(dataSourceType)); + dataSourceConfig.setListener(getListenerToTrackCounts("main")); // Adding IAM auth access for AWS Postgres if (postgresUseIamAuth) { Map custom = new HashMap<>(); @@ -94,10 +93,10 @@ private DataSourceConfig buildDataSourceConfig(String dataSourceUrl, String data } @Bean(name = "gmsEbeanServiceConfig") - protected ServerConfig createInstance() { + protected ServerConfig createInstance(@Qualifier("ebeanDataSourceConfig") DataSourceConfig config) { ServerConfig serverConfig = new ServerConfig(); serverConfig.setName("gmsEbeanServiceConfig"); - serverConfig.setDataSourceConfig(buildDataSourceConfig(ebeanDatasourceUrl, "main")); + serverConfig.setDataSourceConfig(config); serverConfig.setDdlGenerate(ebeanAutoCreate); serverConfig.setDdlRun(ebeanAutoCreate); return serverConfig; diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java index a7a911c68c2d46..fd4f62d0255605 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/RestliEntityClientFactory.java @@ -9,6 +9,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; +import java.net.URI; + @Configuration @PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) @@ -23,12 +25,20 @@ public class RestliEntityClientFactory { @Value("${datahub.gms.useSSL}") private boolean gmsUseSSL; + @Value("${datahub.gms.uri}") + private String gmsUri; + @Value("${datahub.gms.sslContext.protocol}") private String gmsSslProtocol; @Bean("restliEntityClient") public RestliEntityClient getRestliEntityClient() { - Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); + final Client restClient; + if (gmsUri != null) { + restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol); + } else { + restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); + } return new RestliEntityClient(restClient); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java index d5cd315a0ada46..471f079683d605 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityregistry/ConfigEntityRegistryFactory.java @@ -9,6 +9,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; +import org.springframework.core.io.Resource; @Configuration @@ -18,13 +19,16 @@ public class ConfigEntityRegistryFactory { @Value("${configEntityRegistry.path}") private String entityRegistryConfigPath; -// @Value("${configEntityRegistry.classpath}") -// @Value("${ENTITY_REGISTRY_CLASS_PATH:../../metadata-custom-models/build/libs/}") -// private String entityRegistryClassPath; + @Value("${configEntityRegistry.resource}") + Resource entityRegistryResource; @Bean(name = "configEntityRegistry") @Nonnull protected ConfigEntityRegistry getInstance() throws IOException, EntityRegistryException { - return new ConfigEntityRegistry(entityRegistryConfigPath); + if (entityRegistryConfigPath != null) { + return new ConfigEntityRegistry(entityRegistryConfigPath); + } else { + return new ConfigEntityRegistry(entityRegistryResource.getInputStream()); + } } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java new file mode 100644 index 00000000000000..47ab2f77935a26 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java @@ -0,0 +1,53 @@ +package com.linkedin.restli.server; + +import com.linkedin.metadata.filter.RestliLoggingFilter; +import com.linkedin.parseq.Engine; +import com.linkedin.parseq.EngineBuilder; +import com.linkedin.r2.filter.FilterChains; +import com.linkedin.r2.filter.transport.FilterChainDispatcher; +import com.linkedin.r2.transport.http.server.RAPServlet; +import com.linkedin.restli.docgen.DefaultDocumentationRequestHandler; +import com.linkedin.restli.server.spring.SpringInjectResourceFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.Executors; + +@Slf4j +@Configuration +public class RAPServletFactory { + @Value("#{systemEnvironment['RESTLI_SERVLET_THREADS']}") + private Integer environmentThreads; + + @Bean(name = "restliSpringInjectResourceFactory") + public SpringInjectResourceFactory springInjectResourceFactory() { + return new SpringInjectResourceFactory(); + } + + @Bean("parseqEngineThreads") + public int parseqEngineThreads() { + return environmentThreads != null ? environmentThreads : (Runtime.getRuntime().availableProcessors() + 1); + } + @Bean + public RAPServlet rapServlet( + @Qualifier("restliSpringInjectResourceFactory") SpringInjectResourceFactory springInjectResourceFactory, + @Qualifier("parseqEngineThreads") int threads) { + log.info("Starting restli servlet with {} threads.", threads); + Engine parseqEngine = new EngineBuilder() + .setTaskExecutor(Executors.newFixedThreadPool(threads)) + .setTimerScheduler(Executors.newSingleThreadScheduledExecutor()) + .build(); + + RestLiConfig config = new RestLiConfig(); + config.setDocumentationRequestHandler(new DefaultDocumentationRequestHandler()); + config.setResourcePackageNames("com.linkedin.metadata.resources"); + config.addFilter(new RestliLoggingFilter()); + + RestLiServer restLiServer = new RestLiServer(config, springInjectResourceFactory, parseqEngine); + return new RAPServlet(new FilterChainDispatcher(new DelegatingTransportDispatcher(restLiServer, restLiServer), + FilterChains.empty())); + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java new file mode 100644 index 00000000000000..723f0333999dd7 --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java @@ -0,0 +1,28 @@ +package com.linkedin.restli.server; + +import com.linkedin.r2.transport.http.server.RAPServlet; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.web.HttpRequestHandler; +import org.springframework.web.context.support.HttpRequestHandlerServlet; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; + +@Component +public class RestliHandlerServlet extends HttpRequestHandlerServlet implements HttpRequestHandler { + @Autowired + private RAPServlet _r2Servlet; + + @Override + public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { + _r2Servlet.service(req, res); + } + + @Override + public void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + service(request, response); + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/restli/server/spring/ParallelRestliHttpRequestHandler.java b/metadata-service/factories/src/main/java/com/linkedin/restli/server/spring/ParallelRestliHttpRequestHandler.java deleted file mode 100644 index 3b94de67fe0516..00000000000000 --- a/metadata-service/factories/src/main/java/com/linkedin/restli/server/spring/ParallelRestliHttpRequestHandler.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - Copyright (c) 2013 LinkedIn Corp. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ -package com.linkedin.restli.server.spring; - -import com.linkedin.metadata.filter.RestliLoggingFilter; -import com.linkedin.parseq.Engine; -import com.linkedin.parseq.EngineBuilder; -import com.linkedin.r2.filter.FilterChain; -import com.linkedin.r2.filter.FilterChains; -import com.linkedin.r2.filter.transport.FilterChainDispatcher; -import com.linkedin.r2.transport.http.server.RAPServlet; -import com.linkedin.restli.server.DelegatingTransportDispatcher; -import com.linkedin.restli.server.RestLiConfig; -import com.linkedin.restli.server.RestLiServer; -import java.io.IOException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import org.springframework.web.HttpRequestHandler; - -public class ParallelRestliHttpRequestHandler implements HttpRequestHandler { - - private RAPServlet _r2Servlet; - - public ParallelRestliHttpRequestHandler(RestLiConfig config, SpringInjectResourceFactory injectResourceFactory) { - this(config, injectResourceFactory, FilterChains.empty()); - } - - public ParallelRestliHttpRequestHandler(RestLiConfig config, SpringInjectResourceFactory injectResourceFactory, - FilterChain filterChain) { - config.addFilter(new RestliLoggingFilter()); - RestLiServer restLiServer = new RestLiServer(config, injectResourceFactory, getDefaultParseqEngine()); - _r2Servlet = new RAPServlet( - new FilterChainDispatcher(new DelegatingTransportDispatcher(restLiServer, restLiServer), filterChain)); - } - - public Engine getDefaultParseqEngine() { - final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - Engine engine = new EngineBuilder().setTaskExecutor(scheduler).setTimerScheduler(scheduler).build(); - return engine; - } - - public ParallelRestliHttpRequestHandler(RAPServlet r2Servlet) { - _r2Servlet = r2Servlet; - } - - public void handleRequest(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { - _r2Servlet.service(req, res); - } -} diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index 01aa12fca0478c..2831d923274d33 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -53,6 +53,10 @@ datahub: host: ${DATAHUB_GMS_HOST:localhost} port: ${DATAHUB_GMS_PORT:8080} useSSL: ${DATAHUB_GMS_USE_SSL:${GMS_USE_SSL:false}} + + # URI instead of above host/port/ssl + uri: ${DATAHUB_GMS_URI:#{null}} + sslContext: protocol: ${DATAHUB_GMS_SSL_PROTOCOL:${GMS_SSL_PROTOCOL:#{null}}} @@ -79,8 +83,8 @@ searchService: enableCache: ${SEARCH_SERVICE_ENABLE_CACHE:false} configEntityRegistry: - # TODO: Change to read from resources on classpath. path: ${ENTITY_REGISTRY_CONFIG_PATH:../../metadata-models/src/main/resources/entity-registry.yml} + resource: ${ENTITY_REGISTRY_CONFIG_CLASSPATH:classpath:/entity-registry.yml} platformAnalytics: enabled: ${DATAHUB_ANALYTICS_ENABLED:true} diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/common/client/BaseClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/common/client/BaseClient.java index 49d6edadcca9eb..69ee8e4e2e16a4 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/common/client/BaseClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/common/client/BaseClient.java @@ -2,13 +2,19 @@ import com.datahub.authentication.Authentication; import com.linkedin.common.callback.FutureCallback; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.parseq.retry.backoff.BackoffPolicy; +import com.linkedin.parseq.retry.backoff.ExponentialBackoff; import com.linkedin.r2.RemoteInvocationException; + +import java.util.Objects; +import javax.annotation.Nonnull; + import com.linkedin.restli.client.AbstractRequestBuilder; import com.linkedin.restli.client.Client; import com.linkedin.restli.client.Request; import com.linkedin.restli.client.Response; -import java.util.Objects; -import javax.annotation.Nonnull; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHeaders; @@ -17,9 +23,17 @@ public abstract class BaseClient implements AutoCloseable { protected final Client _client; + protected final BackoffPolicy _backoffPolicy; + protected final int _retryCount; protected BaseClient(@Nonnull Client restliClient) { + this(restliClient, new ExponentialBackoff(2), 3); + } + + protected BaseClient(@Nonnull Client restliClient, BackoffPolicy backoffPolicy, int retryCount) { _client = Objects.requireNonNull(restliClient); + _backoffPolicy = backoffPolicy; + _retryCount = retryCount; } protected Response sendClientRequest(final AbstractRequestBuilder> requestBuilder) throws RemoteInvocationException { @@ -29,11 +43,30 @@ protected Response sendClientRequest(final AbstractRequestBuilder Response sendClientRequest( final AbstractRequestBuilder> requestBuilder, @Nonnull final Authentication authentication) throws RemoteInvocationException { requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, authentication.getCredentials()); - return _client.sendRequest(requestBuilder.build()).getResponse(); + + int attemptCount = 0; + + while (attemptCount < _retryCount) { + try { + return _client.sendRequest(requestBuilder.build()).getResponse(); + } catch (Exception ex) { + MetricUtils.counter(BaseClient.class, "exception" + MetricUtils.DELIMITER + ex.getClass().getName().toLowerCase()).inc(); + + if (attemptCount == _retryCount - 1) { + throw ex; + } else { + attemptCount = attemptCount + 1; + Thread.sleep(_backoffPolicy.nextBackoff(attemptCount, ex) * 1000); + } + } + } + + throw new IllegalStateException(); } @Override diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java index a37063bd3fb57f..7b577c99b2b16f 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -295,7 +295,7 @@ default String wrappedIngestProposal(@Nonnull MetadataChangeProposal metadataCha @Nonnull final Authentication authentication, final boolean async) { try { return ingestProposal(metadataChangeProposal, authentication, async); - } catch (RemoteInvocationException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index f1e5352c9da981..b989bdb9d5b4b5 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -626,7 +626,8 @@ public List getTimeseriesAspectValues(@Nonnull String urn, @Non */ @Override public String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, - @Nonnull final Authentication authentication, final boolean async) throws RemoteInvocationException { + @Nonnull final Authentication authentication, + final boolean async) throws RemoteInvocationException { final AspectsDoIngestProposalRequestBuilder requestBuilder = ASPECTS_REQUEST_BUILDERS.actionIngestProposal().proposalParam(metadataChangeProposal).asyncParam(String.valueOf(async)); return sendClientRequest(requestBuilder, authentication).getEntity(); diff --git a/metadata-service/war/build.gradle b/metadata-service/war/build.gradle index cdb2b67b423734..0e5915b69db6ca 100644 --- a/metadata-service/war/build.gradle +++ b/metadata-service/war/build.gradle @@ -13,7 +13,6 @@ dependencies { runtime project(':metadata-jobs:mae-consumer') runtime project(':metadata-jobs:pe-consumer') - runtime externalDependency.logbackClassic runtime externalDependency.awsSecretsManagerJdbc runtime externalDependency.h2 runtime externalDependency.mariadbConnector @@ -24,6 +23,8 @@ dependencies { runtime spec.product.pegasus.restliDocgen runtime spec.product.pegasus.restliSpringBridge + implementation externalDependency.log4j2Api + implementation externalDependency.logbackClassic implementation externalDependency.awsMskIamAuth } diff --git a/metadata-service/war/src/main/webapp/WEB-INF/beans.xml b/metadata-service/war/src/main/webapp/WEB-INF/beans.xml index bb2a833253d73b..10aa36263f9388 100644 --- a/metadata-service/war/src/main/webapp/WEB-INF/beans.xml +++ b/metadata-service/war/src/main/webapp/WEB-INF/beans.xml @@ -10,7 +10,7 @@ http://www.springframework.org/schema/task/spring-task-3.0.xsd" > - + @@ -22,15 +22,7 @@ In web.xml, HttpRequestHandlerServlet loads this "restliRequestHandler" spring bean as a servlet. For details, see: http://static.springsource.org/spring-framework/docs/3.2.0.RC1/api/org/springframework/web/context/support/HttpRequestHandlerServlet.html --> - - - - - - - - - + diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/restli/DefaultRestliClientFactory.java b/metadata-utils/src/main/java/com/linkedin/metadata/restli/DefaultRestliClientFactory.java index b2e023527a1855..436c7ae5d77b55 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/restli/DefaultRestliClientFactory.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/restli/DefaultRestliClientFactory.java @@ -15,6 +15,7 @@ import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLParameters; +import java.net.URI; import java.security.InvalidParameterException; import java.security.NoSuchAlgorithmException; import java.util.Collections; @@ -41,49 +42,39 @@ public static RestClient getRestLiD2Client(@Nonnull String restLiClientD2ZkHost, } @Nonnull - public static RestClient getRestLiClient(@Nonnull String restLiServerHost, int restLiServerPort) { - return getRestLiClient(restLiServerHost, restLiServerPort, false, null); + public static RestClient getRestLiClient(@Nonnull String restLiServerHost, int restLiServerPort, boolean useSSL, + @Nullable String sslProtocol) { + return getRestLiClient( + URI.create(String.format("%s://%s:%s", useSSL ? "https" : "http", restLiServerHost, restLiServerPort)), + sslProtocol); } @Nonnull - public static RestClient getRestLiClient(@Nonnull String restLiServerHost, int restLiServerPort, boolean useSSL, - @Nullable String sslProtocol) { - if (StringUtils.isBlank(restLiServerHost) || restLiServerPort <= 0) { + public static RestClient getRestLiClient(@Nonnull URI gmsUri, @Nullable String sslProtocol) { + if (StringUtils.isBlank(gmsUri.getHost()) || gmsUri.getPort() <= 0) { throw new InvalidParameterException("Invalid restli server host name or port!"); } - if (useSSL) { - return getHttpsRestClient(restLiServerHost, restLiServerPort, sslProtocol); - } else { - return getHttpRestClient(restLiServerHost, restLiServerPort); - } - } - - private static RestClient getHttpsRestClient(@Nonnull String restLiServerHost, int restLiServerPort, - @Nullable String sslProtocol) { Map params = new HashMap<>(); - - try { - params.put(HttpClientFactory.HTTP_SSL_CONTEXT, SSLContext.getDefault()); - } catch (NoSuchAlgorithmException ex) { - throw new RuntimeException(ex); - } - SSLParameters sslParameters = new SSLParameters(); - if (sslProtocol != null) { - sslParameters.setProtocols(new String[]{sslProtocol}); + if ("https".equals(gmsUri.getScheme())) { + try { + params.put(HttpClientFactory.HTTP_SSL_CONTEXT, SSLContext.getDefault()); + } catch (NoSuchAlgorithmException ex) { + throw new RuntimeException(ex); + } + + SSLParameters sslParameters = new SSLParameters(); + if (sslProtocol != null) { + sslParameters.setProtocols(new String[]{sslProtocol}); + } + params.put(HttpClientFactory.HTTP_SSL_PARAMS, sslParameters); } - params.put(HttpClientFactory.HTTP_SSL_PARAMS, sslParameters); - - return getHttpRestClient("https", restLiServerHost, restLiServerPort, params); - } - private static RestClient getHttpRestClient(@Nonnull String restLiServerHost, int restLiServerPort) { - return getHttpRestClient("http", restLiServerHost, restLiServerPort, new HashMap<>()); + return getHttpRestClient(gmsUri, params); } - private static RestClient getHttpRestClient(@Nonnull String scheme, @Nonnull String restLiServerHost, - int restLiServerPort, @Nonnull Map params) { + private static RestClient getHttpRestClient(@Nonnull URI gmsUri, @Nonnull Map params) { Map finalParams = new HashMap<>(); finalParams.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, DEFAULT_REQUEST_TIMEOUT_IN_MS); finalParams.putAll(params); @@ -91,6 +82,7 @@ private static RestClient getHttpRestClient(@Nonnull String scheme, @Nonnull Str HttpClientFactory http = new HttpClientFactory.Builder().build(); TransportClient transportClient = http.getClient(Collections.unmodifiableMap(finalParams)); Client r2Client = new TransportClientAdapter(transportClient); - return new RestClient(r2Client, scheme + "://" + restLiServerHost + ":" + restLiServerPort + "/"); + String uriPrefix = gmsUri.getPath().endsWith("/") ? gmsUri.toString() : gmsUri + "/"; + return new RestClient(r2Client, uriPrefix); } }