From 825e5273d0082929421300960eb1603d27b933da Mon Sep 17 00:00:00 2001 From: Christopher Wirick Date: Wed, 1 Jul 2020 09:40:05 -0700 Subject: [PATCH 01/19] BaseIT & JobCoordinatorIT --- Makefile | 5 +- core/pom.xml | 44 +- .../core/config/FeatureStreamConfig.java | 17 +- .../java/feast/core/config/JobConfig.java | 4 +- .../feast/core/config/MonitoringConfig.java | 2 + .../core/model/AbstractTimestampEntity.java | 2 +- .../core/service/JobCoordinatorService.java | 4 +- core/src/main/resources/application.yml | 2 +- core/src/test/java/feast/core/it/BaseIT.java | 172 ++++++++ .../java/feast/core/it/DataGenerator.java | 66 +++ .../java/feast/core/it/SimpleAPIClient.java | 110 +++++ .../feast/core/service/JobCoordinatorIT.java | 395 ++++++++++++++++++ .../java/feast/core/service/SimpleIT.java | 29 ++ .../test/resources/application-it.properties | 31 ++ pom.xml | 55 ++- sdk/java/pom.xml | 6 - serving/pom.xml | 6 + 17 files changed, 932 insertions(+), 18 deletions(-) create mode 100644 core/src/test/java/feast/core/it/BaseIT.java create mode 100644 core/src/test/java/feast/core/it/DataGenerator.java create mode 100644 core/src/test/java/feast/core/it/SimpleAPIClient.java create mode 100644 core/src/test/java/feast/core/service/JobCoordinatorIT.java create mode 100644 core/src/test/java/feast/core/service/SimpleIT.java create mode 100644 core/src/test/resources/application-it.properties diff --git a/Makefile b/Makefile index 27c5c9954f..c074d15975 100644 --- a/Makefile +++ b/Makefile @@ -46,6 +46,9 @@ lint-java: test-java: mvn --no-transfer-progress test +test-java-integration: + mvn -DskipUTs test-compile failsafe:integration-test failsafe:verify + test-java-with-coverage: mvn --no-transfer-progress test jacoco:report-aggregate @@ -175,4 +178,4 @@ build-html: clean-html # Versions lint-versions: - ./infra/scripts/validate-version-consistency.sh \ No newline at end of file + ./infra/scripts/validate-version-consistency.sh diff --git a/core/pom.xml b/core/pom.xml index 8a6ab32618..7b1a4ce65c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -312,8 +312,50 @@ org.springframework spring-test - 5.1.3.RELEASE + 5.2.5.RELEASE test + + org.junit.jupiter + junit-jupiter + RELEASE + test + + + org.testcontainers + testcontainers + 1.14.3 + test + + + org.testcontainers + junit-jupiter + 1.14.3 + test + + + org.testcontainers + postgresql + 1.14.3 + test + + + org.testcontainers + kafka + 1.14.3 + test + + + org.awaitility + awaitility + 3.0.0 + test + + + org.awaitility + awaitility-proxy + 3.0.0 + test + diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index 795e4f754f..40778b5be3 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -37,7 +37,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; @Slf4j @Configuration @@ -113,7 +116,19 @@ public KafkaTemplate specKafkaTemplate( } @Bean - public ConsumerFactory ackConsumerFactory(FeastProperties feastProperties) { + KafkaListenerContainerFactory< + ConcurrentMessageListenerContainer> + kafkaAckListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + return factory; + } + + @Bean + public ConsumerFactory ackConsumerFactory( + FeastProperties feastProperties) { StreamProperties streamProperties = feastProperties.getStream(); Map props = new HashMap<>(); diff --git a/core/src/main/java/feast/core/config/JobConfig.java b/core/src/main/java/feast/core/config/JobConfig.java index be240d20e9..d8f8ae3e69 100644 --- a/core/src/main/java/feast/core/config/JobConfig.java +++ b/core/src/main/java/feast/core/config/JobConfig.java @@ -34,7 +34,7 @@ import feast.proto.core.SourceProto; import java.util.Map; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -94,7 +94,7 @@ public JobGroupingStrategy getJobGroupingStrategy( * @param feastProperties feast config properties */ @Bean - @Autowired + @ConditionalOnMissingBean public JobManager getJobManager( FeastProperties feastProperties, IngestionJobProto.SpecsStreamingUpdateConfig specsStreamingUpdateConfig) diff --git a/core/src/main/java/feast/core/config/MonitoringConfig.java b/core/src/main/java/feast/core/config/MonitoringConfig.java index 53c9562c47..b3e9757454 100644 --- a/core/src/main/java/feast/core/config/MonitoringConfig.java +++ b/core/src/main/java/feast/core/config/MonitoringConfig.java @@ -23,11 +23,13 @@ import io.prometheus.client.exporter.MetricsServlet; import javax.servlet.http.HttpServlet; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.web.servlet.ServletRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration +@ConditionalOnProperty("management.metrics.export.simple.enabled") public class MonitoringConfig { private static final String PROMETHEUS_METRICS_PATH = "/metrics"; diff --git a/core/src/main/java/feast/core/model/AbstractTimestampEntity.java b/core/src/main/java/feast/core/model/AbstractTimestampEntity.java index 7a544a9cbe..188f19b5bc 100644 --- a/core/src/main/java/feast/core/model/AbstractTimestampEntity.java +++ b/core/src/main/java/feast/core/model/AbstractTimestampEntity.java @@ -33,7 +33,7 @@ public abstract class AbstractTimestampEntity { private Date created; @Temporal(TemporalType.TIMESTAMP) - @Column(name = "lastUpdated", nullable = false) + @Column(name = "last_updated", nullable = false) private Date lastUpdated; @PrePersist diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 59cc619fa6..9b35383145 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -408,7 +408,9 @@ public void notifyJobsWhenFeatureSetUpdated() { * * @param record ConsumerRecord with key: FeatureSet reference and value: Ack message */ - @KafkaListener(topics = {"${feast.stream.specsOptions.specsAckTopic}"}) + @KafkaListener( + topics = {"${feast.stream.specsOptions.specsAckTopic}"}, + containerFactory = "kafkaAckListenerContainerFactory") @Transactional public void listenAckFromJobs( ConsumerRecord record) { diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index fbdf603632..139b700fd8 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -125,7 +125,7 @@ management: metrics: export: simple: - enabled: false + enabled: true statsd: enabled: true host: ${STATSD_HOST:localhost} diff --git a/core/src/test/java/feast/core/it/BaseIT.java b/core/src/test/java/feast/core/it/BaseIT.java new file mode 100644 index 0000000000..aac1a4c340 --- /dev/null +++ b/core/src/test/java/feast/core/it/BaseIT.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.it; + +import feast.core.config.FeastProperties; +import feast.core.util.KafkaSerialization; +import feast.proto.core.IngestionJobProto; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; +import javax.persistence.Table; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.hibernate.engine.spi.SessionImplementor; +import org.junit.jupiter.api.*; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@SpringBootTest +@ActiveProfiles("it") +@Testcontainers +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +public class BaseIT { + + @Container public static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>(); + + @Container public static KafkaContainer kafka = new KafkaContainer(); + + @DynamicPropertySource + static void properties(DynamicPropertyRegistry registry) { + + registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); + registry.add("spring.datasource.username", postgreSQLContainer::getUsername); + registry.add("spring.datasource.password", postgreSQLContainer::getPassword); + registry.add("spring.jpa.hibernate.ddl-auto", () -> "none"); + + registry.add("feast.stream.options.bootstrapServers", kafka::getBootstrapServers); + } + + @TestInstance(TestInstance.Lifecycle.PER_CLASS) + public class SequentialFlow { + @AfterAll + public void tearDown() throws SQLException { + cleanTables(entityManager); + } + } + + public static class BaseTestConfig { + @Bean + public KafkaListenerContainerFactory> + testListenerContainerFactory(ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + return factory; + } + + @Bean + public ConsumerFactory testConsumerFactory() { + Map props = new HashMap<>(); + + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + + return new DefaultKafkaConsumerFactory<>( + props, new StringDeserializer(), new ByteArrayDeserializer()); + } + + @Bean + public KafkaTemplate specAckKafkaTemplate( + FeastProperties feastProperties) { + FeastProperties.StreamProperties streamProperties = feastProperties.getStream(); + Map props = new HashMap<>(); + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + + KafkaTemplate t = + new KafkaTemplate<>( + new DefaultKafkaProducerFactory<>( + props, new StringSerializer(), new KafkaSerialization.ProtoSerializer<>())); + t.setDefaultTopic(streamProperties.getSpecsOptions().getSpecsAckTopic()); + return t; + } + } + + public static void cleanTables(EntityManager em) throws SQLException { + List tableNames = + em.getMetamodel().getEntities().stream() + .map(e -> e.getJavaType().getAnnotation(Table.class).name()) + .collect(Collectors.toList()); + + // this trick needed to get EntityManager with Transaction + // and we don't want to wrap whole class into @Transactional + em = em.getEntityManagerFactory().createEntityManager(); + // Transaction needed only once to do unwrap + SessionImplementor session = em.unwrap(SessionImplementor.class); + + // and here we're actually don't want any transactions + // but instead we pulling raw connection + // to be able to retry query if needed + // since retrying rollbacked transaction is not that easy + Connection connection = session.connection(); + + // retries are needed since truncate require exclusive lock + // and that often leads to Deadlock + // since SpringApp is still running in another thread + var num_retries = 5; + for (var i = 0; i < num_retries; i++) { + try { + Statement statement = connection.createStatement(); + statement.execute(String.format("truncate %s cascade", String.join(", ", tableNames))); + } catch (SQLException e) { + continue; + } + + break; + } + } + + @PersistenceContext EntityManager entityManager; + + public Boolean isNestedTest(TestInfo testInfo) { + return testInfo.getTestClass().get().getAnnotation(Nested.class) != null; + } + + @AfterEach + public void tearDown(TestInfo testInfo) throws SQLException { + if (isNestedTest(testInfo)) { + return; + } + + cleanTables(entityManager); + } +} diff --git a/core/src/test/java/feast/core/it/DataGenerator.java b/core/src/test/java/feast/core/it/DataGenerator.java new file mode 100644 index 0000000000..28991e32d2 --- /dev/null +++ b/core/src/test/java/feast/core/it/DataGenerator.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.it; + +import com.google.common.collect.ImmutableList; +import feast.proto.core.SourceProto; +import feast.proto.core.StoreProto; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Triple; + +public class DataGenerator { + // projectName, featureName, exclude + public static Triple defaultSubscription = Triple.of("*", "*", false); + + public static StoreProto.Store defaultStore = + createStore( + "test-store", StoreProto.Store.StoreType.REDIS, ImmutableList.of(defaultSubscription)); + + public static SourceProto.Source defaultSource = createSource("localhost", "topic"); + + public static SourceProto.Source createSource(String server, String topic) { + return SourceProto.Source.newBuilder() + .setType(SourceProto.SourceType.KAFKA) + .setKafkaSourceConfig( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers(server) + .setTopic(topic) + .build()) + .build(); + } + + public static StoreProto.Store createStore( + String name, + StoreProto.Store.StoreType type, + List> subscriptions) { + return StoreProto.Store.newBuilder() + .addAllSubscriptions( + subscriptions.stream() + .map( + s -> + StoreProto.Store.Subscription.newBuilder() + .setProject(s.getLeft()) + .setName(s.getMiddle()) + .setExclude(s.getRight()) + .build()) + .collect(Collectors.toList())) + .setName(name) + .setType(type) + .build(); + } +} diff --git a/core/src/test/java/feast/core/it/SimpleAPIClient.java b/core/src/test/java/feast/core/it/SimpleAPIClient.java new file mode 100644 index 0000000000..6fc7a1fa09 --- /dev/null +++ b/core/src/test/java/feast/core/it/SimpleAPIClient.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.it; + +import feast.proto.core.*; +import feast.proto.types.ValueProto; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; + +public class SimpleAPIClient { + private CoreServiceGrpc.CoreServiceBlockingStub stub; + + public SimpleAPIClient(CoreServiceGrpc.CoreServiceBlockingStub stub) { + this.stub = stub; + } + + public void simpleApplyFeatureSet( + SourceProto.Source source, + String projectName, + String name, + List> entities, + List> features) { + stub.applyFeatureSet( + CoreServiceProto.ApplyFeatureSetRequest.newBuilder() + .setFeatureSet( + FeatureSetProto.FeatureSet.newBuilder() + .setSpec( + FeatureSetProto.FeatureSetSpec.newBuilder() + .setSource(source) + .setName(name) + .setProject(projectName) + .addAllEntities( + entities.stream() + .map( + pair -> + FeatureSetProto.EntitySpec.newBuilder() + .setName(pair.getLeft()) + .setValueType(pair.getRight()) + .build()) + .collect(Collectors.toList())) + .addAllFeatures( + features.stream() + .map( + pair -> + FeatureSetProto.FeatureSpec.newBuilder() + .setName(pair.getLeft()) + .setValueType(pair.getRight()) + .build()) + .collect(Collectors.toList())) + .build()) + .build()) + .build()); + } + + public List simpleListFeatureSets(String name) { + return stub.listFeatureSets( + CoreServiceProto.ListFeatureSetsRequest.newBuilder() + .setFilter( + CoreServiceProto.ListFeatureSetsRequest.Filter.newBuilder() + .setFeatureSetName(name) + .build()) + .build()) + .getFeatureSetsList(); + } + + public FeatureSetProto.FeatureSet simpleGetFeatureSet(String projectName, String name) { + return stub.getFeatureSet( + CoreServiceProto.GetFeatureSetRequest.newBuilder() + .setName(name) + .setProject(projectName) + .build()) + .getFeatureSet(); + } + + public void updateStore(StoreProto.Store store) { + stub.updateStore(CoreServiceProto.UpdateStoreRequest.newBuilder().setStore(store).build()); + } + + public void createProject(String name) { + stub.createProject(CoreServiceProto.CreateProjectRequest.newBuilder().setName(name).build()); + } + + public void restartIngestionJob(String jobId) { + stub.restartIngestionJob( + CoreServiceProto.RestartIngestionJobRequest.newBuilder().setId(jobId).build()); + } + + public List listIngestionJobs() { + return stub.listIngestionJobs( + CoreServiceProto.ListIngestionJobsRequest.newBuilder() + .setFilter(CoreServiceProto.ListIngestionJobsRequest.Filter.newBuilder().build()) + .build()) + .getJobsList(); + } +} diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java new file mode 100644 index 0000000000..e6423dd8a4 --- /dev/null +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -0,0 +1,395 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.beans.HasPropertyWithValue.hasProperty; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.hamcrest.core.AllOf.allOf; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.dao.JobRepository; +import feast.core.it.BaseIT; +import feast.core.it.DataGenerator; +import feast.core.it.SimpleAPIClient; +import feast.core.job.JobManager; +import feast.core.job.Runner; +import feast.core.model.*; +import feast.proto.core.*; +import feast.proto.types.ValueProto; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.util.*; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; + +@SpringBootTest( + properties = { + "feast.jobs.enabled=true", + "feast.jobs.polling_interval_milliseconds=2000", + "feast.stream.specsOptions.notifyIntervalMilliseconds=100", + "feast.jobs.consolidate-jobs-per-source=true" + }) +public class JobCoordinatorIT extends BaseIT { + @Autowired private FakeJobManager jobManager; + + @Autowired private JobRepository jobRepository; + + @Autowired KafkaTemplate ackPublisher; + + static CoreServiceGrpc.CoreServiceBlockingStub stub; + static List specsMailbox = new ArrayList<>(); + static SimpleAPIClient apiClient; + + @BeforeAll + public static void globalSetUp(@Value("${grpc.server.port}") int port) { + ManagedChannel channel = + ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); + stub = CoreServiceGrpc.newBlockingStub(channel); + apiClient = new SimpleAPIClient(stub); + } + + @BeforeEach + public void setUp(TestInfo testInfo) { + apiClient.updateStore(DataGenerator.defaultStore); + + specsMailbox = new ArrayList<>(); + + if (!isNestedTest(testInfo)) { + jobManager.cleanAll(); + } + } + + @KafkaListener( + topics = {"${feast.stream.specsOptions.specsTopic}"}, + containerFactory = "testListenerContainerFactory") + public void listenSpecs(ConsumerRecord record) + throws InvalidProtocolBufferException { + FeatureSetProto.FeatureSetSpec featureSetSpec = + FeatureSetProto.FeatureSetSpec.parseFrom(record.value()); + specsMailbox.add(featureSetSpec); + } + + @Test + public void shouldCreateJobForNewSource() { + apiClient.simpleApplyFeatureSet( + DataGenerator.defaultSource, + "default", + "test", + Collections.emptyList(), + Collections.emptyList()); + + List featureSets = apiClient.simpleListFeatureSets("*"); + assertThat(featureSets.size(), equalTo(1)); + + await() + .until( + jobManager::getAllJobs, + hasItem( + allOf( + hasProperty("runner", equalTo(Runner.DIRECT)), + hasProperty("id", containsString("kafka--627317556")), + hasProperty("jobStores", hasSize(1)), + hasProperty("featureSetJobStatuses", hasSize(1))))); + } + + @Test + public void shouldUpgradeJobWhenStoreChanged() { + apiClient.simpleApplyFeatureSet( + DataGenerator.defaultSource, + "project", + "test", + Collections.emptyList(), + Collections.emptyList()); + + await().until(jobManager::getAllJobs, hasSize(1)); + + apiClient.updateStore( + DataGenerator.createStore( + "new-store", + StoreProto.Store.StoreType.REDIS, + ImmutableList.of(DataGenerator.defaultSubscription))); + + await() + .until( + jobManager::getAllJobs, + containsInAnyOrder( + allOf( + hasProperty("jobStores", hasSize(2)), + hasProperty("featureSetJobStatuses", hasSize(1))))); + + await().until(jobManager::getAllJobs, hasSize(1)); + } + + @Test + public void shouldRestoreJobThatStopped() { + apiClient.simpleApplyFeatureSet( + DataGenerator.defaultSource, + "project", + "test", + Collections.emptyList(), + Collections.emptyList()); + + await().until(jobManager::getAllJobs, hasSize(1)); + Job job = jobRepository.findByStatus(JobStatus.RUNNING).get(0); + + List ingestionJobs = apiClient.listIngestionJobs(); + assertThat(ingestionJobs, hasSize(1)); + assertThat(ingestionJobs, containsInAnyOrder(hasProperty("id", equalTo(job.getId())))); + + apiClient.restartIngestionJob(ingestionJobs.get(0).getId()); + + await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED)); + + await() + .until( + apiClient::listIngestionJobs, + hasItem( + allOf( + hasProperty("status", equalTo(IngestionJobProto.IngestionJobStatus.RUNNING)), + hasProperty("id", not(ingestionJobs.get(0).getId()))))); + } + + @TestMethodOrder(MethodOrderer.OrderAnnotation.class) + @Nested + class SpecNotificationFlow extends SequentialFlow { + Job job; + + @Test + @Order(1) + public void shouldSendNewSpec() { + jobManager.cleanAll(); + + job = + Job.builder() + .setSource(Source.fromProto(DataGenerator.defaultSource)) + .setId("some-running-id") + .setStatus(JobStatus.RUNNING) + .build(); + + jobManager.startJob(job); + + job.setStores(ImmutableSet.of(Store.fromProto(DataGenerator.defaultStore))); + jobRepository.saveAndFlush(job); + + apiClient.simpleApplyFeatureSet( + DataGenerator.defaultSource, + "default", + "test", + ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), + Collections.emptyList()); + + FeatureSetProto.FeatureSet featureSet = apiClient.simpleGetFeatureSet("default", "test"); + + assertThat( + featureSet.getMeta().getStatus(), + equalTo(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)); + + await().until(() -> specsMailbox, hasSize(1)); + + assertThat( + specsMailbox.get(0), + allOf( + hasProperty("project", equalTo("default")), + hasProperty("name", equalTo("test")), + hasProperty("entitiesList", hasSize(1)), + hasProperty("version", equalTo(1)))); + + assertThat(jobRepository.findByStatus(JobStatus.RUNNING), hasSize(1)); + } + + @Test + @Order(2) + public void shouldUpdateSpec() { + apiClient.simpleApplyFeatureSet( + DataGenerator.defaultSource, + "default", + "test", + ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), + ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32))); + + await().until(() -> specsMailbox, hasSize(1)); + + assertThat( + specsMailbox.get(0), + allOf( + hasProperty("project", equalTo("default")), + hasProperty("name", equalTo("test")), + hasProperty("featuresList", hasSize(1)), + hasProperty("version", equalTo(2)))); + } + + @Test + @Order(3) + public void shouldIgnoreOutdatedACKs() throws InterruptedException { + ackPublisher.sendDefault( + "default/test", + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setFeatureSetVersion(1) + .setJobName(job.getId()) + .setFeatureSetReference("default/test") + .build()); + + // time to process + Thread.sleep(1000); + + FeatureSetProto.FeatureSet featureSet = apiClient.simpleGetFeatureSet("default", "test"); + + assertThat( + featureSet.getMeta().getStatus(), + equalTo(FeatureSetProto.FeatureSetStatus.STATUS_PENDING)); + } + + @Test + @Order(4) + public void shouldUpdateDeliveryStatus() { + ackPublisher.sendDefault( + "default/test", + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setFeatureSetVersion(2) + .setJobName(job.getId()) + .setFeatureSetReference("default/test") + .build()); + + await() + .until( + () -> apiClient.simpleGetFeatureSet("default", "test").getMeta().getStatus(), + equalTo(FeatureSetProto.FeatureSetStatus.STATUS_READY)); + } + + @Test + @Order(5) + public void shouldReallocateFeatureSetAfterSourceChanged() { + assertThat(jobManager.getJobStatus(job), equalTo(JobStatus.RUNNING)); + + apiClient.simpleApplyFeatureSet( + DataGenerator.createSource("localhost", "newTopic"), + "default", + "test", + ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), + ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32))); + + await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED)); + + await().until(() -> jobRepository.findByStatus(JobStatus.RUNNING), hasSize(1)); + + assertThat( + specsMailbox.get(0), + allOf( + hasProperty("project", equalTo("default")), + hasProperty("name", equalTo("test")), + hasProperty("version", equalTo(3)))); + } + + @Test + @Order(6) + public void shouldUpdateStatusAfterACKfromNewJob() { + job = jobRepository.findByStatus(JobStatus.RUNNING).get(0); + + ackPublisher.sendDefault( + "default/test", + IngestionJobProto.FeatureSetSpecAck.newBuilder() + .setFeatureSetVersion(3) + .setJobName(job.getId()) + .setFeatureSetReference("default/test") + .build()); + + await() + .until( + () -> apiClient.simpleGetFeatureSet("default", "test").getMeta().getStatus(), + equalTo(FeatureSetProto.FeatureSetStatus.STATUS_READY)); + } + } + + public static class FakeJobManager implements JobManager { + private final Map state; + + public FakeJobManager() { + state = new HashMap<>(); + } + + @Override + public Runner getRunnerType() { + return Runner.DIRECT; + } + + @Override + public Job startJob(Job job) { + String extId = UUID.randomUUID().toString(); + job.setExtId(extId); + job.setStatus(JobStatus.RUNNING); + state.put(extId, job); + return job; + } + + @Override + public Job updateJob(Job job) { + return job; + } + + @Override + public Job abortJob(Job job) { + job.setStatus(JobStatus.ABORTING); + state.remove(job.getExtId()); + return job; + } + + @Override + public Job restartJob(Job job) { + return abortJob(job); + } + + @Override + public JobStatus getJobStatus(Job job) { + if (state.containsKey(job.getExtId())) { + return JobStatus.RUNNING; + } + + return JobStatus.ABORTED; + } + + public List getAllJobs() { + return Lists.newArrayList(state.values()); + } + + public void cleanAll() { + state.clear(); + } + } + + @TestConfiguration + public static class TestConfig extends BaseTestConfig { + @Bean + public JobManager getJobManager() { + return new FakeJobManager(); + } + } +} diff --git a/core/src/test/java/feast/core/service/SimpleIT.java b/core/src/test/java/feast/core/service/SimpleIT.java new file mode 100644 index 0000000000..54809d423c --- /dev/null +++ b/core/src/test/java/feast/core/service/SimpleIT.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.service; + +import feast.core.it.BaseIT; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest() +public class SimpleIT extends BaseIT { + @Test + public void test() { + assert true; + } +} diff --git a/core/src/test/resources/application-it.properties b/core/src/test/resources/application-it.properties new file mode 100644 index 0000000000..28ff612a09 --- /dev/null +++ b/core/src/test/resources/application-it.properties @@ -0,0 +1,31 @@ +# +# Copyright 2018 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +grpc.server.port=${GRPC_PORT:6666} + +feast.security.authentication.enabled = false +feast.security.authorization.enabled = false + +feast.jobs.enabled=false + +spring.jpa.properties.hibernate.format_sql=true +spring.jpa.properties.hibernate.show_sql=false +spring.jpa.hibernate.naming.physical-strategy=org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy +spring.jpa.hibernate.ddl-auto=none + +spring.main.allow-bean-definition-overriding=true + +management.metrics.export.simple.enabled=false diff --git a/pom.xml b/pom.xml index dd2e0c3d41..529b60b7dc 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,8 @@ 6.0.8 2.9.9 2.0.2 + + false @@ -526,16 +528,61 @@ + + org.codehaus.mojo + build-helper-maven-plugin + 3.0.0 + org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M4 + 3.0.0-M5 @{argLine} -Xms2048m -Xmx2048m -Djdk.net.URLClassPath.disableClassPathURLCheck=true - - IntegrationTest - + ${skipUTs} + + + + org.junit.vintage + junit-vintage-engine + 5.5.2 + + + + + unit-tests + + test + + + + */test/java/** + + + + + + + maven-failsafe-plugin + 3.0.0-M5 + + + org.junit.jupiter + junit-jupiter-engine + 5.6.2 + + + + + + integration-tests + + integration-test + verify + + + org.springframework.boot diff --git a/sdk/java/pom.xml b/sdk/java/pom.xml index b52119f18e..fa1be41d08 100644 --- a/sdk/java/pom.xml +++ b/sdk/java/pom.xml @@ -103,12 +103,6 @@ ${mockito.version} test - - org.junit.vintage - junit-vintage-engine - ${junit.version} - test - diff --git a/serving/pom.xml b/serving/pom.xml index 986775058d..fbc772f5c8 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -259,6 +259,12 @@ org.springframework.boot spring-boot-starter-test test + + + org.junit.vintage + junit-vintage-engine + + From a47cb9358e2fddad814f2e4d124e2629da1a07f0 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 17 Jul 2020 09:28:44 +0300 Subject: [PATCH 02/19] it action --- .github/workflows/integration_tests.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .github/workflows/integration_tests.yml diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml new file mode 100644 index 0000000000..053f653ec0 --- /dev/null +++ b/.github/workflows/integration_tests.yml @@ -0,0 +1,16 @@ +name: integration tests +on: [push, pull_request] +jobs: + maven-integration-test: + runs-on: ubuntu-latest + name: Maven Integration Test + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: '11' + java-package: jdk + architecture: x64 + - name: Run integration tests + run: make test-java-integration \ No newline at end of file From 36e27c42faf51a781db4d2c9c92fdc58cffbc324 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 17 Jul 2020 09:36:34 +0300 Subject: [PATCH 03/19] check fail --- core/src/test/java/feast/core/{service => it}/SimpleIT.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) rename core/src/test/java/feast/core/{service => it}/SimpleIT.java (91%) diff --git a/core/src/test/java/feast/core/service/SimpleIT.java b/core/src/test/java/feast/core/it/SimpleIT.java similarity index 91% rename from core/src/test/java/feast/core/service/SimpleIT.java rename to core/src/test/java/feast/core/it/SimpleIT.java index 54809d423c..079b36bd6d 100644 --- a/core/src/test/java/feast/core/service/SimpleIT.java +++ b/core/src/test/java/feast/core/it/SimpleIT.java @@ -14,9 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.core.service; +package feast.core.it; -import feast.core.it.BaseIT; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -24,6 +23,6 @@ public class SimpleIT extends BaseIT { @Test public void test() { - assert true; + assert false; } } From 625ef1031cf3414a0ff181c811c959f1d0782be4 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 17 Jul 2020 09:43:58 +0300 Subject: [PATCH 04/19] --no-transfer-progress --- Makefile | 2 +- core/src/test/java/feast/core/it/SimpleIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index c074d15975..00b3dbd810 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ test-java: mvn --no-transfer-progress test test-java-integration: - mvn -DskipUTs test-compile failsafe:integration-test failsafe:verify + mvn --no-transfer-progress -DskipUTs test-compile failsafe:integration-test failsafe:verify test-java-with-coverage: mvn --no-transfer-progress test jacoco:report-aggregate diff --git a/core/src/test/java/feast/core/it/SimpleIT.java b/core/src/test/java/feast/core/it/SimpleIT.java index 079b36bd6d..b9e866cf1b 100644 --- a/core/src/test/java/feast/core/it/SimpleIT.java +++ b/core/src/test/java/feast/core/it/SimpleIT.java @@ -23,6 +23,6 @@ public class SimpleIT extends BaseIT { @Test public void test() { - assert false; + assert true; } } From cf9c3d34e9355b64e37b96ec42e1eb71a7a984bf Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 17 Jul 2020 09:50:25 +0300 Subject: [PATCH 05/19] pr comments --- core/pom.xml | 2 +- core/src/test/java/feast/core/it/BaseIT.java | 2 +- .../test/java/feast/core/it/DataGenerator.java | 18 +++++++++++++++--- .../feast/core/service/JobCoordinatorIT.java | 18 +++++++++--------- .../test/resources/application-it.properties | 2 +- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 7b1a4ce65c..fce6ce5eaa 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -318,7 +318,7 @@ org.junit.jupiter junit-jupiter - RELEASE + 5.6.2 test diff --git a/core/src/test/java/feast/core/it/BaseIT.java b/core/src/test/java/feast/core/it/BaseIT.java index aac1a4c340..839fbf2cad 100644 --- a/core/src/test/java/feast/core/it/BaseIT.java +++ b/core/src/test/java/feast/core/it/BaseIT.java @@ -78,7 +78,7 @@ static void properties(DynamicPropertyRegistry registry) { @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class SequentialFlow { @AfterAll - public void tearDown() throws SQLException { + public void tearDown() throws Exception { cleanTables(entityManager); } } diff --git a/core/src/test/java/feast/core/it/DataGenerator.java b/core/src/test/java/feast/core/it/DataGenerator.java index 28991e32d2..114ceecb9f 100644 --- a/core/src/test/java/feast/core/it/DataGenerator.java +++ b/core/src/test/java/feast/core/it/DataGenerator.java @@ -25,13 +25,25 @@ public class DataGenerator { // projectName, featureName, exclude - public static Triple defaultSubscription = Triple.of("*", "*", false); + static Triple defaultSubscription = Triple.of("*", "*", false); - public static StoreProto.Store defaultStore = + static StoreProto.Store defaultStore = createStore( "test-store", StoreProto.Store.StoreType.REDIS, ImmutableList.of(defaultSubscription)); - public static SourceProto.Source defaultSource = createSource("localhost", "topic"); + static SourceProto.Source defaultSource = createSource("localhost", "topic"); + + public static Triple getDefaultSubscription() { + return defaultSubscription; + } + + public static StoreProto.Store getDefaultStore() { + return defaultStore; + } + + public static SourceProto.Source getDefaultSource() { + return defaultSource; + } public static SourceProto.Source createSource(String server, String topic) { return SourceProto.Source.newBuilder() diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index e6423dd8a4..47fe01fe97 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -79,7 +79,7 @@ public static void globalSetUp(@Value("${grpc.server.port}") int port) { @BeforeEach public void setUp(TestInfo testInfo) { - apiClient.updateStore(DataGenerator.defaultStore); + apiClient.updateStore(DataGenerator.getDefaultStore()); specsMailbox = new ArrayList<>(); @@ -101,7 +101,7 @@ public void listenSpecs(ConsumerRecord record) @Test public void shouldCreateJobForNewSource() { apiClient.simpleApplyFeatureSet( - DataGenerator.defaultSource, + DataGenerator.getDefaultSource(), "default", "test", Collections.emptyList(), @@ -124,7 +124,7 @@ public void shouldCreateJobForNewSource() { @Test public void shouldUpgradeJobWhenStoreChanged() { apiClient.simpleApplyFeatureSet( - DataGenerator.defaultSource, + DataGenerator.getDefaultSource(), "project", "test", Collections.emptyList(), @@ -136,7 +136,7 @@ public void shouldUpgradeJobWhenStoreChanged() { DataGenerator.createStore( "new-store", StoreProto.Store.StoreType.REDIS, - ImmutableList.of(DataGenerator.defaultSubscription))); + ImmutableList.of(DataGenerator.getDefaultSubscription()))); await() .until( @@ -152,7 +152,7 @@ public void shouldUpgradeJobWhenStoreChanged() { @Test public void shouldRestoreJobThatStopped() { apiClient.simpleApplyFeatureSet( - DataGenerator.defaultSource, + DataGenerator.getDefaultSource(), "project", "test", Collections.emptyList(), @@ -190,18 +190,18 @@ public void shouldSendNewSpec() { job = Job.builder() - .setSource(Source.fromProto(DataGenerator.defaultSource)) + .setSource(Source.fromProto(DataGenerator.getDefaultSource())) .setId("some-running-id") .setStatus(JobStatus.RUNNING) .build(); jobManager.startJob(job); - job.setStores(ImmutableSet.of(Store.fromProto(DataGenerator.defaultStore))); + job.setStores(ImmutableSet.of(Store.fromProto(DataGenerator.getDefaultStore()))); jobRepository.saveAndFlush(job); apiClient.simpleApplyFeatureSet( - DataGenerator.defaultSource, + DataGenerator.getDefaultSource(), "default", "test", ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), @@ -230,7 +230,7 @@ public void shouldSendNewSpec() { @Order(2) public void shouldUpdateSpec() { apiClient.simpleApplyFeatureSet( - DataGenerator.defaultSource, + DataGenerator.getDefaultSource(), "default", "test", ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), diff --git a/core/src/test/resources/application-it.properties b/core/src/test/resources/application-it.properties index 28ff612a09..99cfc876a2 100644 --- a/core/src/test/resources/application-it.properties +++ b/core/src/test/resources/application-it.properties @@ -14,7 +14,7 @@ # limitations under the License. # # -grpc.server.port=${GRPC_PORT:6666} +grpc.server.port=6666 feast.security.authentication.enabled = false feast.security.authorization.enabled = false From 2cb15eb51f2f6412d68711610190b7fe516f6ce0 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 17 Jul 2020 11:20:53 +0300 Subject: [PATCH 06/19] comments for BaseIT --- core/src/test/java/feast/core/it/BaseIT.java | 33 ++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/feast/core/it/BaseIT.java b/core/src/test/java/feast/core/it/BaseIT.java index 839fbf2cad..597bc1872e 100644 --- a/core/src/test/java/feast/core/it/BaseIT.java +++ b/core/src/test/java/feast/core/it/BaseIT.java @@ -54,6 +54,10 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +/** + * Base Integration Test class. Setups postgres & kafka containers. Configures related properties & + * beans. Provides DB related clean up between tests. + */ @SpringBootTest @ActiveProfiles("it") @Testcontainers @@ -64,6 +68,11 @@ public class BaseIT { @Container public static KafkaContainer kafka = new KafkaContainer(); + /** + * Configure Spring Application to use postgres & kafka rolled out in containers + * + * @param registry + */ @DynamicPropertySource static void properties(DynamicPropertyRegistry registry) { @@ -75,6 +84,11 @@ static void properties(DynamicPropertyRegistry registry) { registry.add("feast.stream.options.bootstrapServers", kafka::getBootstrapServers); } + /** + * SequentialFlow is base class that is supposed to be inherited by @Nested test classes that + * wants to preserve context between test cases. For SequentialFlow databases is being truncated + * only once after all tests passed. + */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class SequentialFlow { @AfterAll @@ -83,6 +97,11 @@ public void tearDown() throws Exception { } } + /** + * This class must be inherited inside IT Class and annotated with {@link + * org.springframework.boot.test.context.TestConfiguration}. It provides configuration needed to + * communicate with Feast via Kafka + */ public static class BaseTestConfig { @Bean public KafkaListenerContainerFactory> @@ -121,6 +140,12 @@ public KafkaTemplate specAckKafkaTe } } + /** + * Truncates all tables in Database (between tests or flows). Retries on deadlock + * + * @param em EntityManager + * @throws SQLException + */ public static void cleanTables(EntityManager em) throws SQLException { List tableNames = em.getMetamodel().getEntities().stream() @@ -143,11 +168,14 @@ public static void cleanTables(EntityManager em) throws SQLException { // and that often leads to Deadlock // since SpringApp is still running in another thread var num_retries = 5; - for (var i = 0; i < num_retries; i++) { + for (var i = 1; i <= num_retries; i++) { try { Statement statement = connection.createStatement(); statement.execute(String.format("truncate %s cascade", String.join(", ", tableNames))); } catch (SQLException e) { + if (i == num_retries) { + throw e; + } continue; } @@ -157,12 +185,13 @@ public static void cleanTables(EntityManager em) throws SQLException { @PersistenceContext EntityManager entityManager; + /** Used to determine SequentialFlows */ public Boolean isNestedTest(TestInfo testInfo) { return testInfo.getTestClass().get().getAnnotation(Nested.class) != null; } @AfterEach - public void tearDown(TestInfo testInfo) throws SQLException { + public void tearDown(TestInfo testInfo) throws Exception { if (isNestedTest(testInfo)) { return; } From 37daacb4e55afb392bf5f1db5d14c420f03ed88e Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 17 Jul 2020 11:28:58 +0300 Subject: [PATCH 07/19] comments for BaseIT --- core/src/test/java/feast/core/it/BaseIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/feast/core/it/BaseIT.java b/core/src/test/java/feast/core/it/BaseIT.java index 597bc1872e..e7eaa13d48 100644 --- a/core/src/test/java/feast/core/it/BaseIT.java +++ b/core/src/test/java/feast/core/it/BaseIT.java @@ -55,8 +55,8 @@ import org.testcontainers.junit.jupiter.Testcontainers; /** - * Base Integration Test class. Setups postgres & kafka containers. Configures related properties & - * beans. Provides DB related clean up between tests. + * Base Integration Test class. Setups postgres and kafka containers. Configures related properties + * and beans. Provides DB related clean up between tests. */ @SpringBootTest @ActiveProfiles("it") @@ -69,7 +69,7 @@ public class BaseIT { @Container public static KafkaContainer kafka = new KafkaContainer(); /** - * Configure Spring Application to use postgres & kafka rolled out in containers + * Configure Spring Application to use postgres and kafka rolled out in containers * * @param registry */ From 6fa240933a4b4d6b89795f2e152f851679d55681 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 17 Jul 2020 22:38:25 +0800 Subject: [PATCH 08/19] Update POM to pass maven verify --- pom.xml | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 529b60b7dc..5c18cb4b03 100644 --- a/pom.xml +++ b/pom.xml @@ -572,8 +572,6 @@ 5.6.2 - - integration-tests @@ -583,6 +581,14 @@ + + + ${groupId}:${artifactId} + + + ${project.build.outputDirectory} + + org.springframework.boot @@ -742,6 +748,9 @@ + + exec + org.xolstice.maven.plugins From 3391e69235880fea5fcee89258d50fcffb22e2f8 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 17 Jul 2020 22:39:53 +0800 Subject: [PATCH 09/19] Update make command to support verify for integration tests --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 00b3dbd810..9a6b2ae7f7 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ test-java: mvn --no-transfer-progress test test-java-integration: - mvn --no-transfer-progress -DskipUTs test-compile failsafe:integration-test failsafe:verify + mvn --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean verify test-java-with-coverage: mvn --no-transfer-progress test jacoco:report-aggregate From f3deadc2d07c6ff47a71bd7d6a8aad5d786a23ad Mon Sep 17 00:00:00 2001 From: pyalex Date: Sun, 19 Jul 2020 12:51:54 +0300 Subject: [PATCH 10/19] skip unit tests in e2e --- infra/scripts/setup-common-functions.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/scripts/setup-common-functions.sh b/infra/scripts/setup-common-functions.sh index 01bc21be22..6cfe115331 100755 --- a/infra/scripts/setup-common-functions.sh +++ b/infra/scripts/setup-common-functions.sh @@ -72,7 +72,7 @@ build_feast_core_and_serving() { --output-dir /root/ # Build jars for Feast - mvn --quiet --batch-mode --define skipTests=true clean package + mvn --quiet --batch-mode -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package ls -lh core/target/*jar ls -lh serving/target/*jar From 7b5d98628482dc4793d8eae5d03af1436b0f55b8 Mon Sep 17 00:00:00 2001 From: pyalex Date: Sun, 19 Jul 2020 12:55:52 +0300 Subject: [PATCH 11/19] skip unit tests in docker build --- infra/docker/core/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/docker/core/Dockerfile b/infra/docker/core/Dockerfile index f210e1c40c..4131ba1dcb 100644 --- a/infra/docker/core/Dockerfile +++ b/infra/docker/core/Dockerfile @@ -13,7 +13,7 @@ WORKDIR /build # ENV MAVEN_OPTS="-Dmaven.repo.local=/build/.m2/repository -DdependencyLocationsEnabled=false" RUN mvn --also-make --projects core,ingestion -Drevision=$REVISION \ - -DskipTests=true --batch-mode clean package + -DskipUTs=true --batch-mode clean package # # Unpack the jar and copy the files into production Docker image # for faster startup time when starting Dataflow jobs from Feast Core. From 6eec3e50ea008b5eea36ad0361488ebe184a0a67 Mon Sep 17 00:00:00 2001 From: pyalex Date: Sun, 19 Jul 2020 13:07:19 +0300 Subject: [PATCH 12/19] build verbose --- infra/scripts/setup-common-functions.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/scripts/setup-common-functions.sh b/infra/scripts/setup-common-functions.sh index 6cfe115331..11727745d6 100755 --- a/infra/scripts/setup-common-functions.sh +++ b/infra/scripts/setup-common-functions.sh @@ -72,7 +72,7 @@ build_feast_core_and_serving() { --output-dir /root/ # Build jars for Feast - mvn --quiet --batch-mode -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package + mvn --batch-mode -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package ls -lh core/target/*jar ls -lh serving/target/*jar From 8eff27a77324c778cf2d267e8bed4e549a2e341c Mon Sep 17 00:00:00 2001 From: pyalex Date: Sun, 19 Jul 2020 13:21:28 +0300 Subject: [PATCH 13/19] run exec.jar --- infra/scripts/setup-common-functions.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/infra/scripts/setup-common-functions.sh b/infra/scripts/setup-common-functions.sh index 11727745d6..4710f3369c 100755 --- a/infra/scripts/setup-common-functions.sh +++ b/infra/scripts/setup-common-functions.sh @@ -72,7 +72,7 @@ build_feast_core_and_serving() { --output-dir /root/ # Build jars for Feast - mvn --batch-mode -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package + mvn --quiet --batch-mode -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean package ls -lh core/target/*jar ls -lh serving/target/*jar @@ -86,7 +86,7 @@ start_feast_core() { export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1" fi - nohup java -jar core/target/feast-core-$FEAST_BUILD_VERSION.jar $CONFIG_ARG &>/var/log/feast-core.log & + nohup java -jar core/target/feast-core-$FEAST_BUILD_VERSION-exec.jar $CONFIG_ARG &>/var/log/feast-core.log & ${SCRIPTS_DIR}/wait-for-it.sh localhost:6565 --timeout=90 tail -n10 /var/log/feast-core.log @@ -101,7 +101,7 @@ start_feast_serving() { export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1" fi - nohup java -jar serving/target/feast-serving-$FEAST_BUILD_VERSION.jar $CONFIG_ARG &>/var/log/feast-serving-online.log & + nohup java -jar serving/target/feast-serving-$FEAST_BUILD_VERSION-exec.jar $CONFIG_ARG &>/var/log/feast-serving-online.log & ${SCRIPTS_DIR}/wait-for-it.sh localhost:6566 --timeout=60 tail -n100 /var/log/feast-serving-online.log From c0451b5abb1322f77cb65ec1477ca9069fc64053 Mon Sep 17 00:00:00 2001 From: pyalex Date: Sun, 19 Jul 2020 13:33:17 +0300 Subject: [PATCH 14/19] serving docker build w/o tests --- infra/docker/serving/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/docker/serving/Dockerfile b/infra/docker/serving/Dockerfile index 8f2abf5b75..19d9fd9288 100644 --- a/infra/docker/serving/Dockerfile +++ b/infra/docker/serving/Dockerfile @@ -13,7 +13,7 @@ WORKDIR /build # ENV MAVEN_OPTS="-Dmaven.repo.local=/build/.m2/repository -DdependencyLocationsEnabled=false" RUN mvn --also-make --projects serving -Drevision=$REVISION \ - -DskipTests=true --batch-mode clean package + -DskipUTs=true --batch-mode clean package # # Download grpc_health_probe to run health check for Feast Serving # https://kubernetes.io/blog/2018/10/01/health-checking-grpc-servers-on-kubernetes/ From 4390d496f9c76b73ac4419a375c1bd27b533be91 Mon Sep 17 00:00:00 2001 From: pyalex Date: Sun, 19 Jul 2020 14:04:05 +0300 Subject: [PATCH 15/19] using exec jar in docker --- infra/docker/core/Dockerfile | 2 +- infra/docker/serving/Dockerfile | 2 +- .../storage/connectors/bigquery/writer/BigQuerySinkTest.java | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/infra/docker/core/Dockerfile b/infra/docker/core/Dockerfile index 4131ba1dcb..657c60a5c4 100644 --- a/infra/docker/core/Dockerfile +++ b/infra/docker/core/Dockerfile @@ -40,7 +40,7 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa FROM openjdk:11-jre as production ARG REVISION=dev -COPY --from=builder /build/core/target/feast-core-$REVISION.jar /opt/feast/feast-core.jar +COPY --from=builder /build/core/target/feast-core-$REVISION-exec.jar /opt/feast/feast-core.jar # Required for staging jar dependencies when submitting Dataflow jobs. COPY --from=builder /build/core/target/feast-core-$REVISION /opt/feast/feast-core COPY --from=builder /usr/bin/grpc-health-probe /usr/bin/grpc-health-probe diff --git a/infra/docker/serving/Dockerfile b/infra/docker/serving/Dockerfile index 19d9fd9288..e92338db95 100644 --- a/infra/docker/serving/Dockerfile +++ b/infra/docker/serving/Dockerfile @@ -28,7 +28,7 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa FROM openjdk:11-jre-slim as production ARG REVISION=dev -COPY --from=builder /build/serving/target/feast-serving-$REVISION.jar /opt/feast/feast-serving.jar +COPY --from=builder /build/serving/target/feast-serving-$REVISION-exec.jar /opt/feast/feast-serving.jar COPY --from=builder /usr/bin/grpc-health-probe /usr/bin/grpc-health-probe CMD ["java",\ "-Xms1024m",\ diff --git a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java index ba2089414b..aae20752a8 100644 --- a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java +++ b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java @@ -396,8 +396,6 @@ public void updateSpecInFlight() { AvroCoder.of(FeatureSetReference.class), ProtoCoder.of(FeatureSetSpec.class))) .advanceWatermarkTo(Instant.now()) .addElements(KV.of(FeatureSetReference.of("myproject", "fs", 1), spec)) - .advanceProcessingTime(Duration.standardSeconds(5)) - // .advanceWatermarkTo(Instant.now().plus(Duration.standardSeconds(5))) .addElements(KV.of(FeatureSetReference.of("myproject", "fs", 1), spec_fs_2)) .advanceWatermarkToInfinity(); From 50b8309acf300a4b77fa9a46a67d114f607aed39 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 21 Jul 2020 12:49:07 +0800 Subject: [PATCH 16/19] unpack exec jar --- infra/docker/core/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/docker/core/Dockerfile b/infra/docker/core/Dockerfile index 657c60a5c4..66d695bb6f 100644 --- a/infra/docker/core/Dockerfile +++ b/infra/docker/core/Dockerfile @@ -23,7 +23,7 @@ RUN mvn --also-make --projects core,ingestion -Drevision=$REVISION \ # # https://github.com/feast-dev/feast/pull/291 RUN apt-get -qq update && apt-get -y install unar && \ - unar /build/core/target/feast-core-$REVISION.jar -o /build/core/target/ + unar /build/core/target/feast-core-$REVISION-exec.jar -o /build/core/target/ # # Download grpc_health_probe to run health check for Feast Serving From af822cb83d87a483096d5e3a0c61b5fe0d4a2722 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 21 Jul 2020 13:11:17 +0800 Subject: [PATCH 17/19] unpack exec jar --- infra/docker/core/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/docker/core/Dockerfile b/infra/docker/core/Dockerfile index 66d695bb6f..a34b5ccd05 100644 --- a/infra/docker/core/Dockerfile +++ b/infra/docker/core/Dockerfile @@ -42,7 +42,7 @@ ARG REVISION=dev COPY --from=builder /build/core/target/feast-core-$REVISION-exec.jar /opt/feast/feast-core.jar # Required for staging jar dependencies when submitting Dataflow jobs. -COPY --from=builder /build/core/target/feast-core-$REVISION /opt/feast/feast-core +COPY --from=builder /build/core/target/feast-core-$REVISION-exec /opt/feast/feast-core COPY --from=builder /usr/bin/grpc-health-probe /usr/bin/grpc-health-probe CMD ["java",\ From 03f00e5a75c1e7ad8370eedbe68fe16c5f16448a Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 21 Jul 2020 14:08:59 +0800 Subject: [PATCH 18/19] pr comments --- .../core/config/FeatureStreamConfig.java | 21 ++++++ .../java/feast/core/it/DataGenerator.java | 37 +++++++++++ .../java/feast/core/it/SimpleAPIClient.java | 40 +---------- .../feast/core/service/JobCoordinatorIT.java | 66 ++++++++++--------- pom.xml | 18 ----- 5 files changed, 96 insertions(+), 86 deletions(-) diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java index 40778b5be3..6130e0b23b 100644 --- a/core/src/main/java/feast/core/config/FeatureStreamConfig.java +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -97,6 +97,13 @@ public NewTopic featureSetSpecsAckTopic(FeastProperties feastProperties) { (short) 1); } + /** + * Creates kafka publisher for sending FeatureSetSpecs to ingestion job. Uses ProtoSerializer to + * serialize FeatureSetSpec. + * + * @param feastProperties + * @return + */ @Bean public KafkaTemplate specKafkaTemplate( FeastProperties feastProperties) { @@ -115,6 +122,13 @@ public KafkaTemplate specKafkaTemplate( return t; } + /** + * Set configured consumerFactory for specs acknowledgment topic (see ackConsumerFactory) as + * default for KafkaListener. + * + * @param consumerFactory + * @return + */ @Bean KafkaListenerContainerFactory< ConcurrentMessageListenerContainer> @@ -126,6 +140,13 @@ public KafkaTemplate specKafkaTemplate( return factory; } + /** + * Prepares kafka consumer (by configuring ConsumerFactory) to receive acknowledgments from + * IngestionJob on successful updates of FeatureSetSpecs. + * + * @param feastProperties + * @return ConsumerFactory for FeatureSetSpecAck + */ @Bean public ConsumerFactory ackConsumerFactory( FeastProperties feastProperties) { diff --git a/core/src/test/java/feast/core/it/DataGenerator.java b/core/src/test/java/feast/core/it/DataGenerator.java index 114ceecb9f..d09d25514a 100644 --- a/core/src/test/java/feast/core/it/DataGenerator.java +++ b/core/src/test/java/feast/core/it/DataGenerator.java @@ -17,10 +17,13 @@ package feast.core.it; import com.google.common.collect.ImmutableList; +import feast.proto.core.FeatureSetProto; import feast.proto.core.SourceProto; import feast.proto.core.StoreProto; +import feast.proto.types.ValueProto; import java.util.List; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; public class DataGenerator { @@ -75,4 +78,38 @@ public static StoreProto.Store createStore( .setType(type) .build(); } + + public static FeatureSetProto.FeatureSet createFeatureSet( + SourceProto.Source source, + String projectName, + String name, + List> entities, + List> features) { + return FeatureSetProto.FeatureSet.newBuilder() + .setSpec( + FeatureSetProto.FeatureSetSpec.newBuilder() + .setSource(source) + .setName(name) + .setProject(projectName) + .addAllEntities( + entities.stream() + .map( + pair -> + FeatureSetProto.EntitySpec.newBuilder() + .setName(pair.getLeft()) + .setValueType(pair.getRight()) + .build()) + .collect(Collectors.toList())) + .addAllFeatures( + features.stream() + .map( + pair -> + FeatureSetProto.FeatureSpec.newBuilder() + .setName(pair.getLeft()) + .setValueType(pair.getRight()) + .build()) + .collect(Collectors.toList())) + .build()) + .build(); + } } diff --git a/core/src/test/java/feast/core/it/SimpleAPIClient.java b/core/src/test/java/feast/core/it/SimpleAPIClient.java index 6fc7a1fa09..1dbc38adc6 100644 --- a/core/src/test/java/feast/core/it/SimpleAPIClient.java +++ b/core/src/test/java/feast/core/it/SimpleAPIClient.java @@ -17,10 +17,7 @@ package feast.core.it; import feast.proto.core.*; -import feast.proto.types.ValueProto; import java.util.List; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; public class SimpleAPIClient { private CoreServiceGrpc.CoreServiceBlockingStub stub; @@ -29,42 +26,9 @@ public SimpleAPIClient(CoreServiceGrpc.CoreServiceBlockingStub stub) { this.stub = stub; } - public void simpleApplyFeatureSet( - SourceProto.Source source, - String projectName, - String name, - List> entities, - List> features) { + public void simpleApplyFeatureSet(FeatureSetProto.FeatureSet featureSet) { stub.applyFeatureSet( - CoreServiceProto.ApplyFeatureSetRequest.newBuilder() - .setFeatureSet( - FeatureSetProto.FeatureSet.newBuilder() - .setSpec( - FeatureSetProto.FeatureSetSpec.newBuilder() - .setSource(source) - .setName(name) - .setProject(projectName) - .addAllEntities( - entities.stream() - .map( - pair -> - FeatureSetProto.EntitySpec.newBuilder() - .setName(pair.getLeft()) - .setValueType(pair.getRight()) - .build()) - .collect(Collectors.toList())) - .addAllFeatures( - features.stream() - .map( - pair -> - FeatureSetProto.FeatureSpec.newBuilder() - .setName(pair.getLeft()) - .setValueType(pair.getRight()) - .build()) - .collect(Collectors.toList())) - .build()) - .build()) - .build()); + CoreServiceProto.ApplyFeatureSetRequest.newBuilder().setFeatureSet(featureSet).build()); } public List simpleListFeatureSets(String name) { diff --git a/core/src/test/java/feast/core/service/JobCoordinatorIT.java b/core/src/test/java/feast/core/service/JobCoordinatorIT.java index 47fe01fe97..2a87e7378a 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorIT.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorIT.java @@ -101,11 +101,12 @@ public void listenSpecs(ConsumerRecord record) @Test public void shouldCreateJobForNewSource() { apiClient.simpleApplyFeatureSet( - DataGenerator.getDefaultSource(), - "default", - "test", - Collections.emptyList(), - Collections.emptyList()); + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "default", + "test", + Collections.emptyList(), + Collections.emptyList())); List featureSets = apiClient.simpleListFeatureSets("*"); assertThat(featureSets.size(), equalTo(1)); @@ -124,11 +125,12 @@ public void shouldCreateJobForNewSource() { @Test public void shouldUpgradeJobWhenStoreChanged() { apiClient.simpleApplyFeatureSet( - DataGenerator.getDefaultSource(), - "project", - "test", - Collections.emptyList(), - Collections.emptyList()); + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "project", + "test", + Collections.emptyList(), + Collections.emptyList())); await().until(jobManager::getAllJobs, hasSize(1)); @@ -152,11 +154,12 @@ public void shouldUpgradeJobWhenStoreChanged() { @Test public void shouldRestoreJobThatStopped() { apiClient.simpleApplyFeatureSet( - DataGenerator.getDefaultSource(), - "project", - "test", - Collections.emptyList(), - Collections.emptyList()); + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "project", + "test", + Collections.emptyList(), + Collections.emptyList())); await().until(jobManager::getAllJobs, hasSize(1)); Job job = jobRepository.findByStatus(JobStatus.RUNNING).get(0); @@ -201,11 +204,12 @@ public void shouldSendNewSpec() { jobRepository.saveAndFlush(job); apiClient.simpleApplyFeatureSet( - DataGenerator.getDefaultSource(), - "default", - "test", - ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), - Collections.emptyList()); + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "default", + "test", + ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), + Collections.emptyList())); FeatureSetProto.FeatureSet featureSet = apiClient.simpleGetFeatureSet("default", "test"); @@ -230,11 +234,12 @@ public void shouldSendNewSpec() { @Order(2) public void shouldUpdateSpec() { apiClient.simpleApplyFeatureSet( - DataGenerator.getDefaultSource(), - "default", - "test", - ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), - ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32))); + DataGenerator.createFeatureSet( + DataGenerator.getDefaultSource(), + "default", + "test", + ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), + ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32)))); await().until(() -> specsMailbox, hasSize(1)); @@ -291,11 +296,12 @@ public void shouldReallocateFeatureSetAfterSourceChanged() { assertThat(jobManager.getJobStatus(job), equalTo(JobStatus.RUNNING)); apiClient.simpleApplyFeatureSet( - DataGenerator.createSource("localhost", "newTopic"), - "default", - "test", - ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), - ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32))); + DataGenerator.createFeatureSet( + DataGenerator.createSource("localhost", "newTopic"), + "default", + "test", + ImmutableList.of(Pair.of("entity", ValueProto.ValueType.Enum.BOOL)), + ImmutableList.of(Pair.of("feature", ValueProto.ValueType.Enum.INT32)))); await().until(() -> jobManager.getJobStatus(job), equalTo(JobStatus.ABORTED)); diff --git a/pom.xml b/pom.xml index 5c18cb4b03..a1a63b6f49 100644 --- a/pom.xml +++ b/pom.xml @@ -528,11 +528,6 @@ - - org.codehaus.mojo - build-helper-maven-plugin - 3.0.0 - org.apache.maven.plugins maven-surefire-plugin @@ -548,19 +543,6 @@ 5.5.2 - - - unit-tests - - test - - - - */test/java/** - - - - maven-failsafe-plugin From 2b6009fa28a3fc0b9ab56d3f22d03b1d37b785fd Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 21 Jul 2020 14:25:18 +0800 Subject: [PATCH 19/19] clean collector registry --- .../main/java/feast/core/config/MonitoringConfig.java | 2 -- core/src/main/resources/application.yml | 2 +- core/src/test/java/feast/core/it/BaseIT.java | 9 +++++---- core/src/test/resources/application-it.properties | 1 - 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/feast/core/config/MonitoringConfig.java b/core/src/main/java/feast/core/config/MonitoringConfig.java index b3e9757454..53c9562c47 100644 --- a/core/src/main/java/feast/core/config/MonitoringConfig.java +++ b/core/src/main/java/feast/core/config/MonitoringConfig.java @@ -23,13 +23,11 @@ import io.prometheus.client.exporter.MetricsServlet; import javax.servlet.http.HttpServlet; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.web.servlet.ServletRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration -@ConditionalOnProperty("management.metrics.export.simple.enabled") public class MonitoringConfig { private static final String PROMETHEUS_METRICS_PATH = "/metrics"; diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index 139b700fd8..fbdf603632 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -125,7 +125,7 @@ management: metrics: export: simple: - enabled: true + enabled: false statsd: enabled: true host: ${STATSD_HOST:localhost} diff --git a/core/src/test/java/feast/core/it/BaseIT.java b/core/src/test/java/feast/core/it/BaseIT.java index e7eaa13d48..1d99e8dcec 100644 --- a/core/src/test/java/feast/core/it/BaseIT.java +++ b/core/src/test/java/feast/core/it/BaseIT.java @@ -19,6 +19,7 @@ import feast.core.config.FeastProperties; import feast.core.util.KafkaSerialization; import feast.proto.core.IngestionJobProto; +import io.prometheus.client.CollectorRegistry; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; @@ -192,10 +193,10 @@ public Boolean isNestedTest(TestInfo testInfo) { @AfterEach public void tearDown(TestInfo testInfo) throws Exception { - if (isNestedTest(testInfo)) { - return; - } + CollectorRegistry.defaultRegistry.clear(); - cleanTables(entityManager); + if (!isNestedTest(testInfo)) { + cleanTables(entityManager); + } } } diff --git a/core/src/test/resources/application-it.properties b/core/src/test/resources/application-it.properties index 99cfc876a2..a275dfabb0 100644 --- a/core/src/test/resources/application-it.properties +++ b/core/src/test/resources/application-it.properties @@ -28,4 +28,3 @@ spring.jpa.hibernate.ddl-auto=none spring.main.allow-bean-definition-overriding=true -management.metrics.export.simple.enabled=false