Skip to content

Commit

Permalink
Run JobCoontroller as separate application (#951)
Browse files Browse the repository at this point in the history
* moving jc

* unify baseIT

* move ingestion job related code to contrib

* infra

fix simple client

* fix core rest it

* revert bq changes

* fix path

* fix job coordinator it

* fix path in docker

* jc web port

* bean validation from apache.bval

* e2e auth

* default jc_url

* jc_ip in dataflow e2e

* docker compose & health impl

* core-host property

* clean

* more clean up

* format

* rename jc -> jobcontroller

* format

* jobcontroller docker compose config

* pr comments

* pr comments

* label regex

* fix version label
  • Loading branch information
pyalex authored Aug 21, 2020
1 parent 7d8126b commit 16bb358
Show file tree
Hide file tree
Showing 138 changed files with 3,428 additions and 1,616 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/complete.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ jobs:
runs-on: [self-hosted]
strategy:
matrix:
component: [core, serving, jupyter]
component: [core, serving, jobcontroller, jupyter]
env:
GITHUB_PR_SHA: ${{ github.event.pull_request.head.sha }}
REGISTRY: gcr.io/kf-feast
MAVEN_CACHE: gs://feast-templocation-kf-feast/.m2.2019-10-24.tar
MAVEN_CACHE: gs://feast-templocation-kf-feast/.m2.2020-08-19.tar
steps:
- uses: actions/checkout@v2
- uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/master_only.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ jobs:
runs-on: [self-hosted]
strategy:
matrix:
component: [core, serving, jupyter, ci]
component: [core, serving, jobcontroller, jupyter, ci]
env:
MAVEN_CACHE: gs://feast-templocation-kf-feast/.m2.2019-10-24.tar
MAVEN_CACHE: gs://feast-templocation-kf-feast/.m2.2020-08-19.tar
steps:
- uses: actions/checkout@v2
- uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,16 @@ build-push-docker:
@$(MAKE) push-core-docker registry=$(REGISTRY) version=$(VERSION)
@$(MAKE) push-serving-docker registry=$(REGISTRY) version=$(VERSION)
@$(MAKE) push-ci-docker registry=$(REGISTRY) version=$(VERSION)
@$(MAKE) push-jobcontroller-docker registry=$(REGISTRY) version=$(VERSION)

build-docker: build-core-docker build-serving-docker build-ci-docker
build-docker: build-core-docker build-serving-docker build-ci-docker build-jobcontroller-docker

push-core-docker:
docker push $(REGISTRY)/feast-core:$(VERSION)

push-jobcontroller-docker:
docker push $(REGISTRY)/feast-jobcontroller:$(VERSION)

push-serving-docker:
docker push $(REGISTRY)/feast-serving:$(VERSION)

Expand All @@ -135,6 +139,9 @@ push-jupyter-docker:
build-core-docker:
docker build -t $(REGISTRY)/feast-core:$(VERSION) -f infra/docker/core/Dockerfile .

build-jobcontroller-docker:
docker build -t $(REGISTRY)/feast-jobcontroller:$(VERSION) -f infra/docker/jobcontroller/Dockerfile .

build-serving-docker:
docker build -t $(REGISTRY)/feast-serving:$(VERSION) -f infra/docker/serving/Dockerfile .

Expand Down
172 changes: 172 additions & 0 deletions common-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-2020 The Feast Authors
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ https://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>feast-parent</artifactId>
<groupId>dev.feast</groupId>
<version>${revision}</version>
</parent>

<name>Feast Common Test</name>
<description>Feast common module with test utilities</description>
<artifactId>feast-common-test</artifactId>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<argLine>-Xms2048m -Xmx2048m -Djdk.net.URLClassPath.disableClassPathURLCheck=true</argLine>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>dev.feast</groupId>
<artifactId>datatypes-java</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>dev.feast</groupId>
<artifactId>feast-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<!--compileOnly 'org.projectlombok:lombok:1.18.12'-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.14.3</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility-proxy</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>json-path</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>xml-path</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.it;
package feast.common.it;

import feast.core.config.FeastProperties;
import feast.core.util.KafkaSerialization;
import feast.proto.core.IngestionJobProto;
import io.prometheus.client.CollectorRegistry;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Table;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hibernate.engine.spi.SessionImplementor;
import org.junit.jupiter.api.*;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
Expand Down Expand Up @@ -94,7 +81,7 @@ static void properties(DynamicPropertyRegistry registry) {
public class SequentialFlow {
@AfterAll
public void tearDown() throws Exception {
cleanTables(entityManager);
cleanTables();
}
}

Expand All @@ -118,60 +105,45 @@ public ConsumerFactory<String, byte[]> testConsumerFactory() {
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.getClass().getName());

return new DefaultKafkaConsumerFactory<>(
props, new StringDeserializer(), new ByteArrayDeserializer());
}

@Bean
public KafkaTemplate<String, IngestionJobProto.FeatureSetSpecAck> specAckKafkaTemplate(
FeastProperties feastProperties) {
FeastProperties.StreamProperties streamProperties = feastProperties.getStream();
Map<String, Object> props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());

KafkaTemplate<String, IngestionJobProto.FeatureSetSpecAck> t =
new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(
props, new StringSerializer(), new KafkaSerialization.ProtoSerializer<>()));
t.setDefaultTopic(streamProperties.getSpecsOptions().getSpecsAckTopic());
return t;
}
}

/**
* Truncates all tables in Database (between tests or flows). Retries on deadlock
*
* @param em EntityManager
* @throws SQLException
*/
public static void cleanTables(EntityManager em) throws SQLException {
List<String> tableNames =
em.getMetamodel().getEntities().stream()
.map(e -> e.getJavaType().getAnnotation(Table.class).name())
.collect(Collectors.toList());

// this trick needed to get EntityManager with Transaction
// and we don't want to wrap whole class into @Transactional
em = em.getEntityManagerFactory().createEntityManager();
// Transaction needed only once to do unwrap
SessionImplementor session = em.unwrap(SessionImplementor.class);

// and here we're actually don't want any transactions
// but instead we pulling raw connection
// to be able to retry query if needed
// since retrying rollbacked transaction is not that easy
Connection connection = session.connection();
public static void cleanTables() throws SQLException {
Connection connection =
DriverManager.getConnection(
postgreSQLContainer.getJdbcUrl(),
postgreSQLContainer.getUsername(),
postgreSQLContainer.getPassword());

List<String> tableNames = new ArrayList<>();
Statement statement = connection.createStatement();
ResultSet rs =
statement.executeQuery(
"SELECT table_name FROM information_schema.tables WHERE table_schema='public'");
while (rs.next()) {
tableNames.add(rs.getString(1));
}

if (tableNames.isEmpty()) {
return;
}

// retries are needed since truncate require exclusive lock
// and that often leads to Deadlock
// since SpringApp is still running in another thread
var num_retries = 5;
for (var i = 1; i <= num_retries; i++) {
int num_retries = 5;
for (int i = 1; i <= num_retries; i++) {
try {
Statement statement = connection.createStatement();
statement = connection.createStatement();
statement.execute(String.format("truncate %s cascade", String.join(", ", tableNames)));
} catch (SQLException e) {
if (i == num_retries) {
Expand All @@ -184,8 +156,6 @@ public static void cleanTables(EntityManager em) throws SQLException {
}
}

@PersistenceContext EntityManager entityManager;

/** Used to determine SequentialFlows */
public Boolean isSequentialTest(TestInfo testInfo) {
try {
Expand All @@ -201,7 +171,7 @@ public void tearDown(TestInfo testInfo) throws Exception {
CollectorRegistry.defaultRegistry.clear();

if (!isSequentialTest(testInfo)) {
cleanTables(entityManager);
cleanTables();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.it;
package feast.common.it;

import com.google.common.collect.ImmutableList;
import feast.proto.core.FeatureSetProto;
Expand Down
Loading

0 comments on commit 16bb358

Please sign in to comment.