Skip to content

Commit

Permalink
fix(kafka): datahub-upgrade job (datahub-project#6864)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored and cccs-Dustin committed Feb 1, 2023
1 parent 36f2ba0 commit 821bff8
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 10 deletions.
4 changes: 4 additions & 0 deletions datahub-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ dependencies {

annotationProcessor externalDependency.lombok
annotationProcessor externalDependency.picocli

testImplementation externalDependency.springBootTest
testCompile externalDependency.mockito
testCompile externalDependency.testng
}

bootJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@

@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class})
@ComponentScan(basePackages = {"com.linkedin.gms.factory", "com.linkedin.datahub.upgrade.config"},
excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)})
@ComponentScan(basePackages = {
"com.linkedin.gms.factory",
"com.linkedin.datahub.upgrade.config",
"com.linkedin.metadata.dao.producer"
}, excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ScheduledAnalyticsFactory.class)
})
public class UpgradeCliApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(UpgradeCliApplication.class, UpgradeCli.class).web(WebApplicationType.NONE).run(args);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.datahub.upgrade;

import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;

import javax.inject.Named;

import static org.testng.AssertJUnit.assertEquals;

@ActiveProfiles("test")
@SpringBootTest(classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class})
public class UpgradeCliApplicationTest extends AbstractTestNGSpringContextTests {

@Autowired
@Named("restoreIndices")
private RestoreIndices restoreIndices;

@Test
public void testKafkaHealthCheck() {
/*
This might seem like a simple test however it does exercise the spring autowiring of the kafka health check bean
*/
assertEquals(3, restoreIndices.steps().size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.datahub.upgrade;

import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.SearchService;
import io.ebean.EbeanServer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;

@TestConfiguration
@Import(value = {SystemAuthenticationFactory.class})
public class UpgradeCliApplicationTestConfiguration {

@MockBean
private UpgradeCli upgradeCli;

@MockBean
private EbeanServer ebeanServer;

@MockBean
private EntityService entityService;

@MockBean
private SearchService searchService;

@MockBean
private GraphService graphService;

@MockBean
private EntityRegistry entityRegistry;

@MockBean
ConfigEntityRegistry configEntityRegistry;
}
21 changes: 14 additions & 7 deletions docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

: ${DATAHUB_ANALYTICS_ENABLED:=true}

: ${KAFKA_HEAP_OPTS:=-Xmx64M}

CONNECTION_PROPERTIES_PATH=/tmp/connection.properties

function wait_ex {
Expand Down Expand Up @@ -60,26 +62,31 @@ if [[ -n "$KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS" ]]; then
fi

cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180

kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_AUDIT_EVENT_NAME &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_EVENT_NAME &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $FAILED_METADATA_CHANGE_EVENT_NAME &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC &
echo "Waiting for topic creation group 1."
result=$(wait_ex)
rc=$?
if [ $rc -ne 0 ]; then exit $rc; fi
echo "Finished topic creation group 1."

# Set retention to 90 days
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $METADATA_CHANGE_PROPOSAL_TOPIC &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $PLATFORM_EVENT_TOPIC_NAME &
echo "Waiting for topic creation group 2."
result=$(wait_ex)
rc=$?
if [ $rc -ne 0 ]; then exit $rc; fi
echo "Finished topic creation group 2."

# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $DATAHUB_USAGE_EVENT_NAME &
kafka-topics.sh --create --if-not-exists --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR --topic $DATAHUB_USAGE_EVENT_NAME
fi

echo "Waiting for topic creation."
result=$(wait_ex)
rc=$?
if [ $rc -ne 0 ]; then exit $rc; fi
echo "Finished topic creation."

kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact

0 comments on commit 821bff8

Please sign in to comment.