From 821bff80e800dd5a6abbe605dd2605068f97929b Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 27 Dec 2022 12:35:48 -0600 Subject: [PATCH] fix(kafka): datahub-upgrade job (#6864) --- datahub-upgrade/build.gradle | 4 ++ .../upgrade/UpgradeCliApplication.java | 10 +++-- .../upgrade/UpgradeCliApplicationTest.java | 29 ++++++++++++++ ...pgradeCliApplicationTestConfiguration.java | 38 +++++++++++++++++++ docker/kafka-setup/kafka-setup.sh | 21 ++++++---- 5 files changed, 92 insertions(+), 10 deletions(-) create mode 100644 datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java create mode 100644 datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java diff --git a/datahub-upgrade/build.gradle b/datahub-upgrade/build.gradle index 2e31f00e92b947..d8cbd998431a46 100644 --- a/datahub-upgrade/build.gradle +++ b/datahub-upgrade/build.gradle @@ -46,6 +46,10 @@ dependencies { annotationProcessor externalDependency.lombok annotationProcessor externalDependency.picocli + + testImplementation externalDependency.springBootTest + testCompile externalDependency.mockito + testCompile externalDependency.testng } bootJar { diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java index 2314f4e8141842..53a5c0758f3189 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java @@ -11,9 +11,13 @@ @SuppressWarnings("checkstyle:HideUtilityClassConstructor") @SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class}) -@ComponentScan(basePackages = {"com.linkedin.gms.factory", "com.linkedin.datahub.upgrade.config"}, - excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)}) +@ComponentScan(basePackages = { + "com.linkedin.gms.factory", + "com.linkedin.datahub.upgrade.config", + "com.linkedin.metadata.dao.producer" +}, excludeFilters = { + @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class) +}) public class UpgradeCliApplication { public static void main(String[] args) { new SpringApplicationBuilder(UpgradeCliApplication.class, UpgradeCli.class).web(WebApplicationType.NONE).run(args); diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java new file mode 100644 index 00000000000000..33b22206b4b8cb --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTest.java @@ -0,0 +1,29 @@ +package com.linkedin.datahub.upgrade; + +import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +import javax.inject.Named; + +import static org.testng.AssertJUnit.assertEquals; + +@ActiveProfiles("test") +@SpringBootTest(classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class}) +public class UpgradeCliApplicationTest extends AbstractTestNGSpringContextTests { + + @Autowired + @Named("restoreIndices") + private RestoreIndices restoreIndices; + + @Test + public void testKafkaHealthCheck() { + /* + This might seem like a simple test however it does exercise the spring autowiring of the kafka health check bean + */ + assertEquals(3, restoreIndices.steps().size()); + } +} diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java new file mode 100644 index 00000000000000..8ae3a832d0aaf5 --- /dev/null +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java @@ -0,0 +1,38 @@ +package com.linkedin.datahub.upgrade; + +import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.models.registry.ConfigEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.SearchService; +import io.ebean.EbeanServer; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Import; + +@TestConfiguration +@Import(value = {SystemAuthenticationFactory.class}) +public class UpgradeCliApplicationTestConfiguration { + + @MockBean + private UpgradeCli upgradeCli; + + @MockBean + private EbeanServer ebeanServer; + + @MockBean + private EntityService entityService; + + @MockBean + private SearchService searchService; + + @MockBean + private GraphService graphService; + + @MockBean + private EntityRegistry entityRegistry; + + @MockBean + ConfigEntityRegistry configEntityRegistry; +} diff --git a/docker/kafka-setup/kafka-setup.sh b/docker/kafka-setup/kafka-setup.sh index 899f5924a9fe20..93b823a0af8b69 100644 --- a/docker/kafka-setup/kafka-setup.sh +++ b/docker/kafka-setup/kafka-setup.sh @@ -6,6 +6,8 @@ : ${DATAHUB_ANALYTICS_ENABLED:=true} +: ${KAFKA_HEAP_OPTS:=-Xmx64M} + CONNECTION_PROPERTIES_PATH=/tmp/connection.properties function wait_ex { @@ -60,26 +62,31 @@ if [[ -n "$KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS" ]]; then fi cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180 + kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_AUDIT_EVENT_NAME & kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_EVENT_NAME & kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $FAILED_METADATA_CHANGE_EVENT_NAME & kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC & +echo "Waiting for topic creation group 1." +result=$(wait_ex) +rc=$? +if [ $rc -ne 0 ]; then exit $rc; fi +echo "Finished topic creation group 1." # Set retention to 90 days kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC & kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_PROPOSAL_TOPIC & kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC & kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $PLATFORM_EVENT_TOPIC_NAME & +echo "Waiting for topic creation group 2." +result=$(wait_ex) +rc=$? +if [ $rc -ne 0 ]; then exit $rc; fi +echo "Finished topic creation group 2." # Create topic for datahub usage event if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then - kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $DATAHUB_USAGE_EVENT_NAME & + kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $DATAHUB_USAGE_EVENT_NAME fi -echo "Waiting for topic creation." -result=$(wait_ex) -rc=$? -if [ $rc -ne 0 ]; then exit $rc; fi -echo "Finished topic creation." - kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact