diff --git a/storage/connectors/bigquery/pom.xml b/storage/connectors/bigquery/pom.xml
new file mode 100644
index 0000000000..bca3da671c
--- /dev/null
+++ b/storage/connectors/bigquery/pom.xml
@@ -0,0 +1,79 @@
+
+
+
+ dev.feast
+ feast-storage-connectors
+ ${revision}
+
+
+ 4.0.0
+ feast-storage-connector-bigquery
+
+ Feast Storage Connector for BigQuery
+
+
+
+ io.pebbletemplates
+ pebble
+ 3.1.0
+
+
+
+
+ com.google.cloud
+ google-cloud-bigquery
+
+
+
+ com.google.cloud
+ google-cloud-storage
+
+
+
+ com.google.auto.value
+ auto-value-annotations
+ 1.6.6
+
+
+
+ com.google.auto.value
+ auto-value
+ 1.6.6
+ provided
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+ org.apache.beam
+ beam-runners-direct-java
+ ${org.apache.beam.version}
+ test
+
+
+
+ org.hamcrest
+ hamcrest-core
+ test
+
+
+ org.hamcrest
+ hamcrest-library
+ test
+
+
+
+ org.apache.beam
+ beam-sdks-java-io-google-cloud-platform
+ 2.16.0
+ compile
+
+
+
+
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/common/TypeUtil.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/common/TypeUtil.java
new file mode 100644
index 0000000000..dcd1309317
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/common/TypeUtil.java
@@ -0,0 +1,66 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.common;
+
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import feast.types.ValueProto;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TypeUtil {
+
+ private static final Map
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE = new HashMap<>();
+
+ static {
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(ValueProto.ValueType.Enum.BYTES, StandardSQLTypeName.BYTES);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.STRING, StandardSQLTypeName.STRING);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(ValueProto.ValueType.Enum.INT32, StandardSQLTypeName.INT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(ValueProto.ValueType.Enum.INT64, StandardSQLTypeName.INT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.DOUBLE, StandardSQLTypeName.FLOAT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.FLOAT, StandardSQLTypeName.FLOAT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(ValueProto.ValueType.Enum.BOOL, StandardSQLTypeName.BOOL);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.BYTES_LIST, StandardSQLTypeName.BYTES);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.STRING_LIST, StandardSQLTypeName.STRING);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.INT32_LIST, StandardSQLTypeName.INT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.INT64_LIST, StandardSQLTypeName.INT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.DOUBLE_LIST, StandardSQLTypeName.FLOAT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.FLOAT_LIST, StandardSQLTypeName.FLOAT64);
+ VALUE_TYPE_TO_STANDARD_SQL_TYPE.put(
+ ValueProto.ValueType.Enum.BOOL_LIST, StandardSQLTypeName.BOOL);
+ }
+
+ /**
+ * Converts {@link feast.types.ValueProto.ValueType} to its corresponding {@link
+ * StandardSQLTypeName}
+ *
+ * @param valueType value type to convert
+ * @return {@link StandardSQLTypeName}
+ */
+ public static StandardSQLTypeName toStandardSqlType(ValueProto.ValueType.Enum valueType) {
+ return VALUE_TYPE_TO_STANDARD_SQL_TYPE.get(valueType);
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryBatchRetriever.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryBatchRetriever.java
new file mode 100644
index 0000000000..881cbc18eb
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/BigQueryBatchRetriever.java
@@ -0,0 +1,414 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.retrieval;
+
+import static feast.storage.connectors.bigquery.retrieval.QueryTemplater.createEntityTableUUIDQuery;
+import static feast.storage.connectors.bigquery.retrieval.QueryTemplater.createTimestampLimitQuery;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.*;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import feast.serving.ServingAPIProto;
+import feast.storage.api.retrieval.BatchRetriever;
+import feast.storage.api.retrieval.FeatureSetRequest;
+import io.grpc.Status;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.threeten.bp.Duration;
+
+@AutoValue
+public abstract class BigQueryBatchRetriever implements BatchRetriever {
+
+ private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryBatchRetriever.class);
+
+ public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.ofDays(1).toMillis();
+ private static final long SUBQUERY_TIMEOUT_SECS = 900; // 15 minutes
+
+ public abstract String projectId();
+
+ public abstract String datasetId();
+
+ public abstract BigQuery bigquery();
+
+ public abstract String jobStagingLocation();
+
+ public abstract int initialRetryDelaySecs();
+
+ public abstract int totalTimeoutSecs();
+
+ public abstract Storage storage();
+
+ public static Builder builder() {
+ return new AutoValue_BigQueryBatchRetriever.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setProjectId(String projectId);
+
+ public abstract Builder setDatasetId(String datasetId);
+
+ public abstract Builder setJobStagingLocation(String jobStagingLocation);
+
+ public abstract Builder setBigquery(BigQuery bigquery);
+
+ public abstract Builder setInitialRetryDelaySecs(int initialRetryDelaySecs);
+
+ public abstract Builder setTotalTimeoutSecs(int totalTimeoutSecs);
+
+ public abstract Builder setStorage(Storage storage);
+
+ public abstract BigQueryBatchRetriever build();
+ }
+
+ @Override
+ public ServingAPIProto.Job getBatchFeatures(
+ ServingAPIProto.GetBatchFeaturesRequest request, List featureSetRequests) {
+ // 0. Generate job ID
+ String feastJobId = UUID.randomUUID().toString();
+
+ List featureSetQueryInfos =
+ QueryTemplater.getFeatureSetInfos(featureSetRequests);
+
+ // 1. load entity table
+ Table entityTable;
+ String entityTableName;
+ try {
+ entityTable = loadEntities(request.getDatasetSource());
+
+ TableId entityTableWithUUIDs = generateUUIDs(entityTable);
+ entityTableName = generateFullTableName(entityTableWithUUIDs);
+ } catch (Exception e) {
+ return ServingAPIProto.Job.newBuilder()
+ .setId(feastJobId)
+ .setType(ServingAPIProto.JobType.JOB_TYPE_DOWNLOAD)
+ .setStatus(ServingAPIProto.JobStatus.JOB_STATUS_DONE)
+ .setDataFormat(ServingAPIProto.DataFormat.DATA_FORMAT_AVRO)
+ .setError(String.format("Unable to load entity table to BigQuery: %s", e.toString()))
+ .build();
+ }
+
+ Schema entityTableSchema = entityTable.getDefinition().getSchema();
+ List entityTableColumnNames =
+ entityTableSchema.getFields().stream()
+ .map(Field::getName)
+ .filter(name -> !name.equals("event_timestamp"))
+ .collect(Collectors.toList());
+
+ // 2. Retrieve the temporal bounds of the entity dataset provided
+ FieldValueList timestampLimits = getTimestampLimits(entityTableName);
+
+ // 3. Generate the subqueries
+ List featureSetQueries =
+ generateQueries(entityTableName, timestampLimits, featureSetQueryInfos);
+
+ QueryJobConfiguration queryConfig;
+
+ try {
+ // 4. Run the subqueries in parallel then collect the outputs
+ Job queryJob =
+ runBatchQuery(
+ entityTableName, entityTableColumnNames, featureSetQueryInfos, featureSetQueries);
+ queryConfig = queryJob.getConfiguration();
+ String exportTableDestinationUri =
+ String.format("%s/%s/*.avro", jobStagingLocation(), feastJobId);
+
+ // 5. Export the table
+ // Hardcode the format to Avro for now
+ ExtractJobConfiguration extractConfig =
+ ExtractJobConfiguration.of(
+ queryConfig.getDestinationTable(), exportTableDestinationUri, "Avro");
+ Job extractJob = bigquery().create(JobInfo.of(extractConfig));
+ waitForJob(extractJob);
+
+ } catch (BigQueryException | InterruptedException | IOException e) {
+ return ServingAPIProto.Job.newBuilder()
+ .setId(feastJobId)
+ .setType(ServingAPIProto.JobType.JOB_TYPE_DOWNLOAD)
+ .setStatus(ServingAPIProto.JobStatus.JOB_STATUS_DONE)
+ .setError(e.getMessage())
+ .build();
+ }
+
+ List fileUris = parseOutputFileURIs(feastJobId);
+
+ return ServingAPIProto.Job.newBuilder()
+ .setId(feastJobId)
+ .setType(ServingAPIProto.JobType.JOB_TYPE_DOWNLOAD)
+ .setStatus(ServingAPIProto.JobStatus.JOB_STATUS_DONE)
+ .addAllFileUris(fileUris)
+ .setDataFormat(ServingAPIProto.DataFormat.DATA_FORMAT_AVRO)
+ .build();
+ }
+
+ private TableId generateUUIDs(Table loadedEntityTable) {
+ try {
+ String uuidQuery =
+ createEntityTableUUIDQuery(generateFullTableName(loadedEntityTable.getTableId()));
+ QueryJobConfiguration queryJobConfig =
+ QueryJobConfiguration.newBuilder(uuidQuery)
+ .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
+ .build();
+ Job queryJob = bigquery().create(JobInfo.of(queryJobConfig));
+ Job completedJob = waitForJob(queryJob);
+ TableInfo expiry =
+ bigquery()
+ .getTable(queryJobConfig.getDestinationTable())
+ .toBuilder()
+ .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
+ .build();
+ bigquery().update(expiry);
+ queryJobConfig = completedJob.getConfiguration();
+ return queryJobConfig.getDestinationTable();
+ } catch (InterruptedException | BigQueryException e) {
+ throw Status.INTERNAL
+ .withDescription("Failed to load entity dataset into store")
+ .withCause(e)
+ .asRuntimeException();
+ }
+ }
+
+ private FieldValueList getTimestampLimits(String entityTableName) {
+ QueryJobConfiguration getTimestampLimitsQuery =
+ QueryJobConfiguration.newBuilder(createTimestampLimitQuery(entityTableName))
+ .setDefaultDataset(DatasetId.of(projectId(), datasetId()))
+ .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
+ .build();
+ try {
+ Job job = bigquery().create(JobInfo.of(getTimestampLimitsQuery));
+ TableResult getTimestampLimitsQueryResult = waitForJob(job).getQueryResults();
+ TableInfo expiry =
+ bigquery()
+ .getTable(getTimestampLimitsQuery.getDestinationTable())
+ .toBuilder()
+ .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
+ .build();
+ bigquery().update(expiry);
+ FieldValueList result = null;
+ for (FieldValueList fields : getTimestampLimitsQueryResult.getValues()) {
+ result = fields;
+ }
+ if (result == null || result.get("min").isNull() || result.get("max").isNull()) {
+ throw new RuntimeException("query returned insufficient values");
+ }
+ return result;
+ } catch (InterruptedException e) {
+ throw Status.INTERNAL
+ .withDescription("Unable to extract min and max timestamps from query")
+ .withCause(e)
+ .asRuntimeException();
+ }
+ }
+
+ private Table loadEntities(ServingAPIProto.DatasetSource datasetSource) {
+ Table loadedEntityTable;
+ switch (datasetSource.getDatasetSourceCase()) {
+ case FILE_SOURCE:
+ try {
+ // Currently only AVRO format is supported
+ if (datasetSource.getFileSource().getDataFormat()
+ != ServingAPIProto.DataFormat.DATA_FORMAT_AVRO) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Invalid file format, only AVRO is supported.")
+ .asRuntimeException();
+ }
+
+ TableId tableId = TableId.of(projectId(), datasetId(), createTempTableName());
+ log.info(
+ "Loading entity rows to: {}.{}.{}", projectId(), datasetId(), tableId.getTable());
+
+ LoadJobConfiguration loadJobConfiguration =
+ LoadJobConfiguration.of(
+ tableId, datasetSource.getFileSource().getFileUrisList(), FormatOptions.avro());
+ loadJobConfiguration =
+ loadJobConfiguration.toBuilder().setUseAvroLogicalTypes(true).build();
+ Job job = bigquery().create(JobInfo.of(loadJobConfiguration));
+ waitForJob(job);
+
+ TableInfo expiry =
+ bigquery()
+ .getTable(tableId)
+ .toBuilder()
+ .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
+ .build();
+ bigquery().update(expiry);
+
+ loadedEntityTable = bigquery().getTable(tableId);
+ if (!loadedEntityTable.exists()) {
+ throw new RuntimeException(
+ "Unable to create entity dataset table, table already exists");
+ }
+ return loadedEntityTable;
+ } catch (Exception e) {
+ log.error("Exception has occurred in loadEntities method: ", e);
+ throw Status.INTERNAL
+ .withDescription("Failed to load entity dataset into store: " + e.toString())
+ .withCause(e)
+ .asRuntimeException();
+ }
+ case DATASETSOURCE_NOT_SET:
+ default:
+ throw Status.INVALID_ARGUMENT
+ .withDescription("Data source must be set.")
+ .asRuntimeException();
+ }
+ }
+
+ private List generateQueries(
+ String entityTableName,
+ FieldValueList timestampLimits,
+ List featureSetQueryInfos) {
+ List featureSetQueries = new ArrayList<>();
+ try {
+ for (FeatureSetQueryInfo featureSetInfo : featureSetQueryInfos) {
+ String query =
+ QueryTemplater.createFeatureSetPointInTimeQuery(
+ featureSetInfo,
+ projectId(),
+ datasetId(),
+ entityTableName,
+ timestampLimits.get("min").getStringValue(),
+ timestampLimits.get("max").getStringValue());
+ featureSetQueries.add(query);
+ }
+ } catch (IOException e) {
+ throw Status.INTERNAL
+ .withDescription("Unable to generate query for batch retrieval")
+ .withCause(e)
+ .asRuntimeException();
+ }
+ return featureSetQueries;
+ }
+
+ Job runBatchQuery(
+ String entityTableName,
+ List entityTableColumnNames,
+ List featureSetQueryInfos,
+ List featureSetQueries)
+ throws BigQueryException, InterruptedException, IOException {
+ ExecutorService executorService = Executors.newFixedThreadPool(featureSetQueries.size());
+ ExecutorCompletionService executorCompletionService =
+ new ExecutorCompletionService<>(executorService);
+
+ // For each of the feature sets requested, start an async job joining the features in that
+ // feature set to the provided entity table
+ for (int i = 0; i < featureSetQueries.size(); i++) {
+ QueryJobConfiguration queryJobConfig =
+ QueryJobConfiguration.newBuilder(featureSetQueries.get(i))
+ .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
+ .build();
+ Job subqueryJob = bigquery().create(JobInfo.of(queryJobConfig));
+ executorCompletionService.submit(
+ SubqueryCallable.builder()
+ .setBigquery(bigquery())
+ .setFeatureSetInfo(featureSetQueryInfos.get(i))
+ .setSubqueryJob(subqueryJob)
+ .build());
+ }
+
+ List completedFeatureSetQueryInfos = new ArrayList<>();
+
+ for (int i = 0; i < featureSetQueries.size(); i++) {
+ try {
+ // Try to retrieve the outputs of all the jobs. The timeout here is a formality;
+ // a stricter timeout is implemented in the actual SubqueryCallable.
+ FeatureSetQueryInfo featureSetInfo =
+ executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS);
+ completedFeatureSetQueryInfos.add(featureSetInfo);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ executorService.shutdownNow();
+ throw Status.INTERNAL
+ .withDescription("Error running batch query")
+ .withCause(e)
+ .asRuntimeException();
+ }
+ }
+
+ // Generate and run a join query to collect the outputs of all the
+ // subqueries into a single table.
+ String joinQuery =
+ QueryTemplater.createJoinQuery(
+ completedFeatureSetQueryInfos, entityTableColumnNames, entityTableName);
+ QueryJobConfiguration queryJobConfig =
+ QueryJobConfiguration.newBuilder(joinQuery)
+ .setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
+ .build();
+ Job queryJob = bigquery().create(JobInfo.of(queryJobConfig));
+ Job completedQueryJob = waitForJob(queryJob);
+
+ TableInfo expiry =
+ bigquery()
+ .getTable(queryJobConfig.getDestinationTable())
+ .toBuilder()
+ .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
+ .build();
+ bigquery().update(expiry);
+
+ return completedQueryJob;
+ }
+
+ private List parseOutputFileURIs(String feastJobId) {
+ String scheme = jobStagingLocation().substring(0, jobStagingLocation().indexOf("://"));
+ String stagingLocationNoScheme =
+ jobStagingLocation().substring(jobStagingLocation().indexOf("://") + 3);
+ String bucket = stagingLocationNoScheme.split("/")[0];
+ List prefixParts = new ArrayList<>();
+ prefixParts.add(
+ stagingLocationNoScheme.contains("/") && !stagingLocationNoScheme.endsWith("/")
+ ? stagingLocationNoScheme.substring(stagingLocationNoScheme.indexOf("/") + 1)
+ : "");
+ prefixParts.add(feastJobId);
+ String prefix = String.join("/", prefixParts) + "/";
+
+ List fileUris = new ArrayList<>();
+ for (Blob blob : storage().list(bucket, Storage.BlobListOption.prefix(prefix)).iterateAll()) {
+ fileUris.add(String.format("%s://%s/%s", scheme, blob.getBucket(), blob.getName()));
+ }
+ return fileUris;
+ }
+
+ private Job waitForJob(Job queryJob) throws InterruptedException {
+ Job completedJob =
+ queryJob.waitFor(
+ RetryOption.initialRetryDelay(Duration.ofSeconds(initialRetryDelaySecs())),
+ RetryOption.totalTimeout(Duration.ofSeconds(totalTimeoutSecs())));
+ if (completedJob == null) {
+ throw Status.INTERNAL.withDescription("Job no longer exists").asRuntimeException();
+ } else if (completedJob.getStatus().getError() != null) {
+ throw Status.INTERNAL
+ .withDescription("Job failed: " + completedJob.getStatus().getError())
+ .asRuntimeException();
+ }
+ return completedJob;
+ }
+
+ public String generateFullTableName(TableId tableId) {
+ return String.format(
+ "%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
+ }
+
+ public String createTempTableName() {
+ return "_" + UUID.randomUUID().toString().replace("-", "");
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/FeatureSetQueryInfo.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/FeatureSetQueryInfo.java
new file mode 100644
index 0000000000..4938c91161
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/FeatureSetQueryInfo.java
@@ -0,0 +1,86 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.retrieval;
+
+import java.util.List;
+
+public class FeatureSetQueryInfo {
+
+ private final String project;
+ private final String name;
+ private final int version;
+ private final long maxAge;
+ private final List entities;
+ private final List features;
+ private final String table;
+
+ public FeatureSetQueryInfo(
+ String project,
+ String name,
+ int version,
+ long maxAge,
+ List entities,
+ List features,
+ String table) {
+ this.project = project;
+ this.name = name;
+ this.version = version;
+ this.maxAge = maxAge;
+ this.entities = entities;
+ this.features = features;
+ this.table = table;
+ }
+
+ public FeatureSetQueryInfo(FeatureSetQueryInfo featureSetInfo, String table) {
+
+ this.project = featureSetInfo.getProject();
+ this.name = featureSetInfo.getName();
+ this.version = featureSetInfo.getVersion();
+ this.maxAge = featureSetInfo.getMaxAge();
+ this.entities = featureSetInfo.getEntities();
+ this.features = featureSetInfo.getFeatures();
+ this.table = table;
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public long getMaxAge() {
+ return maxAge;
+ }
+
+ public List getEntities() {
+ return entities;
+ }
+
+ public List getFeatures() {
+ return features;
+ }
+
+ public String getTable() {
+ return table;
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/QueryTemplater.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/QueryTemplater.java
new file mode 100644
index 0000000000..3d87da1d80
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/QueryTemplater.java
@@ -0,0 +1,161 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.retrieval;
+
+import com.google.cloud.bigquery.TableId;
+import com.google.protobuf.Duration;
+import com.mitchellbosecke.pebble.PebbleEngine;
+import com.mitchellbosecke.pebble.template.PebbleTemplate;
+import feast.core.FeatureSetProto.EntitySpec;
+import feast.core.FeatureSetProto.FeatureSetSpec;
+import feast.serving.ServingAPIProto.FeatureReference;
+import feast.storage.api.retrieval.FeatureSetRequest;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class QueryTemplater {
+
+ private static final PebbleEngine engine = new PebbleEngine.Builder().build();
+ private static final String FEATURESET_TEMPLATE_NAME = "templates/single_featureset_pit_join.sql";
+ private static final String JOIN_TEMPLATE_NAME = "templates/join_featuresets.sql";
+
+ /**
+ * Get the query for retrieving the earliest and latest timestamps in the entity dataset.
+ *
+ * @param leftTableName full entity dataset name
+ * @return timestamp limit BQ SQL query
+ */
+ public static String createTimestampLimitQuery(String leftTableName) {
+ return String.format(
+ "SELECT DATETIME(MAX(event_timestamp)) as max, DATETIME(MIN(event_timestamp)) as min FROM `%s`",
+ leftTableName);
+ }
+
+ /**
+ * Creates a query that generates a UUID for the entity table, for left joins later on.
+ *
+ * @param leftTableName full entity dataset name
+ * @return uuid generation query
+ */
+ public static String createEntityTableUUIDQuery(String leftTableName) {
+ return String.format(
+ "SELECT GENERATE_UUID() as uuid, `%s`.* from `%s`", leftTableName, leftTableName);
+ }
+
+ /**
+ * Generate the information necessary for the sql templating for point in time correctness join to
+ * the entity dataset for each feature set requested.
+ *
+ * @param featureSetRequests List of {@link FeatureSetRequest} containing a {@link FeatureSetSpec}
+ * and its corresponding {@link FeatureReference}s provided by the user.
+ * @return List of FeatureSetInfos
+ */
+ public static List getFeatureSetInfos(
+ List featureSetRequests) throws IllegalArgumentException {
+
+ List featureSetInfos = new ArrayList<>();
+ for (FeatureSetRequest featureSetRequest : featureSetRequests) {
+ FeatureSetSpec spec = featureSetRequest.getSpec();
+ Duration maxAge = spec.getMaxAge();
+ List fsEntities =
+ spec.getEntitiesList().stream().map(EntitySpec::getName).collect(Collectors.toList());
+ List features =
+ featureSetRequest.getFeatureReferences().stream()
+ .map(FeatureReference::getName)
+ .collect(Collectors.toList());
+ featureSetInfos.add(
+ new FeatureSetQueryInfo(
+ spec.getProject(),
+ spec.getName(),
+ spec.getVersion(),
+ maxAge.getSeconds(),
+ fsEntities,
+ features,
+ ""));
+ }
+ return featureSetInfos;
+ }
+
+ /**
+ * Generate the query for point in time correctness join of data for a single feature set to the
+ * entity dataset.
+ *
+ * @param featureSetInfo Information about the feature set necessary for the query templating
+ * @param projectId google project ID
+ * @param datasetId feast bigquery dataset ID
+ * @param leftTableName entity dataset name
+ * @param minTimestamp earliest allowed timestamp for the historical data in feast
+ * @param maxTimestamp latest allowed timestamp for the historical data in feast
+ * @return point in time correctness join BQ SQL query
+ */
+ public static String createFeatureSetPointInTimeQuery(
+ FeatureSetQueryInfo featureSetInfo,
+ String projectId,
+ String datasetId,
+ String leftTableName,
+ String minTimestamp,
+ String maxTimestamp)
+ throws IOException {
+
+ PebbleTemplate template = engine.getTemplate(FEATURESET_TEMPLATE_NAME);
+ Map context = new HashMap<>();
+ context.put("featureSet", featureSetInfo);
+ context.put("projectId", projectId);
+ context.put("datasetId", datasetId);
+ context.put("minTimestamp", minTimestamp);
+ context.put("maxTimestamp", maxTimestamp);
+ context.put("leftTableName", leftTableName);
+
+ Writer writer = new StringWriter();
+ template.evaluate(writer, context);
+ return writer.toString();
+ }
+
+ /**
+ * @param featureSetInfos List of FeatureSetInfos containing information about the feature set
+ * necessary for the query templating
+ * @param entityTableColumnNames list of column names in entity table
+ * @param leftTableName entity dataset name
+ * @return query to join temporary feature set tables to the entity table
+ */
+ public static String createJoinQuery(
+ List featureSetInfos,
+ List entityTableColumnNames,
+ String leftTableName)
+ throws IOException {
+ PebbleTemplate template = engine.getTemplate(JOIN_TEMPLATE_NAME);
+ Map context = new HashMap<>();
+ context.put("entities", entityTableColumnNames);
+ context.put("featureSets", featureSetInfos);
+ context.put("leftTableName", leftTableName);
+
+ Writer writer = new StringWriter();
+ template.evaluate(writer, context);
+ return writer.toString();
+ }
+
+ public static String generateFullTableName(TableId tableId) {
+ return String.format(
+ "%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/SubqueryCallable.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/SubqueryCallable.java
new file mode 100644
index 0000000000..4a36da2122
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/retrieval/SubqueryCallable.java
@@ -0,0 +1,74 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.retrieval;
+
+import static feast.storage.connectors.bigquery.retrieval.BigQueryBatchRetriever.TEMP_TABLE_EXPIRY_DURATION_MS;
+import static feast.storage.connectors.bigquery.retrieval.QueryTemplater.generateFullTableName;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigquery.*;
+import java.util.concurrent.Callable;
+
+/**
+ * Waits for a point-in-time correctness join to complete. On completion, returns a featureSetInfo
+ * updated with the reference to the table containing the results of the query.
+ */
+@AutoValue
+public abstract class SubqueryCallable implements Callable {
+
+ public abstract BigQuery bigquery();
+
+ public abstract FeatureSetQueryInfo featureSetInfo();
+
+ public abstract Job subqueryJob();
+
+ public static Builder builder() {
+ return new AutoValue_SubqueryCallable.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setBigquery(BigQuery bigquery);
+
+ public abstract Builder setFeatureSetInfo(FeatureSetQueryInfo featureSetInfo);
+
+ public abstract Builder setSubqueryJob(Job subqueryJob);
+
+ public abstract SubqueryCallable build();
+ }
+
+ @Override
+ public FeatureSetQueryInfo call() throws BigQueryException, InterruptedException {
+ QueryJobConfiguration subqueryConfig;
+ subqueryJob().waitFor();
+ subqueryConfig = subqueryJob().getConfiguration();
+ TableId destinationTable = subqueryConfig.getDestinationTable();
+
+ TableInfo expiry =
+ bigquery()
+ .getTable(destinationTable)
+ .toBuilder()
+ .setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
+ .build();
+ bigquery().update(expiry);
+
+ String fullTablePath = generateFullTableName(destinationTable);
+
+ return new FeatureSetQueryInfo(featureSetInfo(), fullTablePath);
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java
new file mode 100644
index 0000000000..91e3c2fd31
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryDeadletterSink.java
@@ -0,0 +1,128 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.write;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import com.google.common.io.Resources;
+import feast.storage.api.write.DeadletterSink;
+import feast.storage.api.write.FailedElement;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.slf4j.Logger;
+
+public class BigQueryDeadletterSink implements DeadletterSink {
+
+ private static final String DEADLETTER_SCHEMA_FILE_PATH = "schemas/deadletter_table_schema.json";
+ private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryDeadletterSink.class);
+
+ private final String tableSpec;
+ private String jsonSchema;
+
+ public BigQueryDeadletterSink(String tableSpec) {
+
+ this.tableSpec = tableSpec;
+ try {
+ jsonSchema =
+ Resources.toString(
+ Resources.getResource(DEADLETTER_SCHEMA_FILE_PATH), StandardCharsets.UTF_8);
+ } catch (Exception e) {
+ log.error(
+ "Unable to read {} file from the resources folder!", DEADLETTER_SCHEMA_FILE_PATH, e);
+ }
+ }
+
+ @Override
+ public void prepareWrite() {}
+
+ @Override
+ public PTransform, PDone> write() {
+ return WriteFailedElement.newBuilder()
+ .setJsonSchema(jsonSchema)
+ .setTableSpec(tableSpec)
+ .build();
+ }
+
+ @AutoValue
+ public abstract static class WriteFailedElement
+ extends PTransform, PDone> {
+
+ public abstract String getTableSpec();
+
+ public abstract String getJsonSchema();
+
+ public static Builder newBuilder() {
+ return new AutoValue_BigQueryDeadletterSink_WriteFailedElement.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /**
+ * @param tableSpec Table spec should follow the format "PROJECT_ID:DATASET_ID.TABLE_ID".
+ * Table will be created if not exists.
+ */
+ public abstract Builder setTableSpec(String tableSpec);
+
+ /**
+ * @param jsonSchema JSON string describing the schema
+ * of the table.
+ */
+ public abstract Builder setJsonSchema(String jsonSchema);
+
+ public abstract WriteFailedElement build();
+ }
+
+ @Override
+ public PDone expand(PCollection input) {
+ input
+ .apply("FailedElementToTableRow", ParDo.of(new FailedElementToTableRowFn()))
+ .apply(
+ "WriteFailedElementsToBigQuery",
+ BigQueryIO.writeTableRows()
+ .to(getTableSpec())
+ .withJsonSchema(getJsonSchema())
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ return PDone.in(input.getPipeline());
+ }
+ }
+
+ public static class FailedElementToTableRowFn extends DoFn {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ final FailedElement element = context.element();
+ final TableRow tableRow =
+ new TableRow()
+ .set("timestamp", element.getTimestamp().toString())
+ .set("job_name", element.getJobName())
+ .set("transform_name", element.getTransformName())
+ .set("payload", element.getPayload())
+ .set("error_message", element.getErrorMessage())
+ .set("stack_trace", element.getStackTrace());
+ context.output(tableRow);
+ }
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryFeatureSink.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryFeatureSink.java
new file mode 100644
index 0000000000..b38728fa96
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryFeatureSink.java
@@ -0,0 +1,189 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.write;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigquery.*;
+import com.google.common.collect.ImmutableMap;
+import feast.core.FeatureSetProto;
+import feast.core.StoreProto.Store.BigQueryConfig;
+import feast.storage.api.write.FeatureSink;
+import feast.storage.api.write.WriteResult;
+import feast.storage.connectors.bigquery.common.TypeUtil;
+import feast.storage.connectors.bigquery.write.AutoValue_BigQueryFeatureSink.Builder;
+import feast.types.FeatureRowProto;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+
+@AutoValue
+public abstract class BigQueryFeatureSink implements FeatureSink {
+ private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryFeatureSink.class);
+
+ // Column description for reserved fields
+ public static final String BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION =
+ "Event time for the FeatureRow";
+ public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
+ "Processing time of the FeatureRow ingestion in Feast\"";
+ public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
+ "Feast import job ID for the FeatureRow";
+
+ public abstract String getProjectId();
+
+ public abstract String getDatasetId();
+
+ public abstract BigQuery getBigQuery();
+
+ /**
+ * Initialize a {@link BigQueryFeatureSink.Builder} from a {@link BigQueryConfig}. This method
+ * initializes a {@link BigQuery} client with default options. Use the builder method to inject
+ * your own client.
+ *
+ * @param config {@link BigQueryConfig}
+ * @return {@link BigQueryFeatureSink.Builder}
+ */
+ public static BigQueryFeatureSink fromConfig(BigQueryConfig config) {
+ return builder()
+ .setDatasetId(config.getDatasetId())
+ .setProjectId(config.getProjectId())
+ .setBigQuery(BigQueryOptions.getDefaultInstance().getService())
+ .build();
+ }
+
+ public static Builder builder() {
+ return new AutoValue_BigQueryFeatureSink.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setProjectId(String projectId);
+
+ public abstract Builder setDatasetId(String datasetId);
+
+ public abstract Builder setBigQuery(BigQuery bigQuery);
+
+ public abstract BigQueryFeatureSink build();
+ }
+
+ /** @param featureSet Feature set to be written */
+ @Override
+ public void prepareWrite(FeatureSetProto.FeatureSet featureSet) {
+ BigQuery bigquery = getBigQuery();
+ FeatureSetProto.FeatureSetSpec featureSetSpec = featureSet.getSpec();
+
+ DatasetId datasetId = DatasetId.of(getProjectId(), getDatasetId());
+ if (bigquery.getDataset(datasetId) == null) {
+ log.info(
+ "Creating dataset '{}' in project '{}'", datasetId.getDataset(), datasetId.getProject());
+ bigquery.create(DatasetInfo.of(datasetId));
+ }
+ String tableName =
+ String.format(
+ "%s_%s_v%d",
+ featureSetSpec.getProject(), featureSetSpec.getName(), featureSetSpec.getVersion())
+ .replaceAll("-", "_");
+ TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
+
+ // Return if there is an existing table
+ Table table = bigquery.getTable(tableId);
+ if (table != null) {
+ log.info(
+ "Writing to existing BigQuery table '{}:{}.{}'",
+ getProjectId(),
+ datasetId.getDataset(),
+ tableName);
+ return;
+ }
+
+ log.info(
+ "Creating table '{}' in dataset '{}' in project '{}'",
+ tableId.getTable(),
+ datasetId.getDataset(),
+ datasetId.getProject());
+ TableDefinition tableDefinition = createBigQueryTableDefinition(featureSet.getSpec());
+ TableInfo tableInfo = TableInfo.of(tableId, tableDefinition);
+ bigquery.create(tableInfo);
+ }
+
+ @Override
+ public PTransform, WriteResult> write() {
+ return new BigQueryWrite(DatasetId.of(getProjectId(), getDatasetId()));
+ }
+
+ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSetSpec spec) {
+ List fields = new ArrayList<>();
+ log.info("Table will have the following fields:");
+
+ for (FeatureSetProto.EntitySpec entitySpec : spec.getEntitiesList()) {
+ Field.Builder builder =
+ Field.newBuilder(
+ entitySpec.getName(), TypeUtil.toStandardSqlType(entitySpec.getValueType()));
+ if (entitySpec.getValueType().name().toLowerCase().endsWith("_list")) {
+ builder.setMode(Field.Mode.REPEATED);
+ }
+ Field field = builder.build();
+ log.info("- {}", field.toString());
+ fields.add(field);
+ }
+ for (FeatureSetProto.FeatureSpec featureSpec : spec.getFeaturesList()) {
+ Field.Builder builder =
+ Field.newBuilder(
+ featureSpec.getName(), TypeUtil.toStandardSqlType(featureSpec.getValueType()));
+ if (featureSpec.getValueType().name().toLowerCase().endsWith("_list")) {
+ builder.setMode(Field.Mode.REPEATED);
+ }
+ Field field = builder.build();
+ log.info("- {}", field.toString());
+ fields.add(field);
+ }
+
+ // Refer to protos/feast/core/Store.proto for reserved fields in BigQuery.
+ Map>
+ reservedFieldNameToPairOfStandardSQLTypeAndDescription =
+ ImmutableMap.of(
+ "event_timestamp",
+ Pair.of(StandardSQLTypeName.TIMESTAMP, BIGQUERY_EVENT_TIMESTAMP_FIELD_DESCRIPTION),
+ "created_timestamp",
+ Pair.of(
+ StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
+ "job_id",
+ Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
+ for (Map.Entry> entry :
+ reservedFieldNameToPairOfStandardSQLTypeAndDescription.entrySet()) {
+ Field field =
+ Field.newBuilder(entry.getKey(), entry.getValue().getLeft())
+ .setDescription(entry.getValue().getRight())
+ .build();
+ log.info("- {}", field.toString());
+ fields.add(field);
+ }
+
+ TimePartitioning timePartitioning =
+ TimePartitioning.newBuilder(TimePartitioning.Type.DAY).setField("event_timestamp").build();
+ log.info("Table partitioning: " + timePartitioning.toString());
+
+ return StandardTableDefinition.newBuilder()
+ .setTimePartitioning(timePartitioning)
+ .setSchema(Schema.of(fields))
+ .build();
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryWrite.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryWrite.java
new file mode 100644
index 0000000000..5e815881e6
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/BigQueryWrite.java
@@ -0,0 +1,107 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.write;
+
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.DatasetId;
+import feast.storage.api.write.FailedElement;
+import feast.storage.api.write.WriteResult;
+import feast.types.FeatureRowProto;
+import java.io.IOException;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+
+/**
+ * A {@link PTransform} that writes {@link FeatureRowProto FeatureRows} to the specified BigQuery
+ * dataset, and returns a {@link WriteResult} containing the unsuccessful writes. Since Bigquery
+ * does not output successful writes, we cannot emit those, and so no success metrics will be
+ * captured if this sink is used.
+ */
+public class BigQueryWrite
+ extends PTransform, WriteResult> {
+ private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryWrite.class);
+
+ // Destination dataset
+ private DatasetId destination;
+
+ public BigQueryWrite(DatasetId destination) {
+ this.destination = destination;
+ }
+
+ @Override
+ public WriteResult expand(PCollection input) {
+ String jobName = input.getPipeline().getOptions().getJobName();
+ org.apache.beam.sdk.io.gcp.bigquery.WriteResult bigqueryWriteResult =
+ input.apply(
+ "WriteTableRowToBigQuery",
+ BigQueryIO.write()
+ .to(new GetTableDestination(destination.getProject(), destination.getDataset()))
+ .withFormatFunction(new FeatureRowToTableRow(jobName))
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
+ .withExtendedErrorInfo()
+ .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
+ .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
+
+ PCollection failedElements =
+ bigqueryWriteResult
+ .getFailedInsertsWithErr()
+ .apply(
+ "WrapBigQueryInsertionError",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ TableDataInsertAllResponse.InsertErrors error =
+ context.element().getError();
+ TableRow row = context.element().getRow();
+ try {
+ context.output(
+ FailedElement.newBuilder()
+ .setErrorMessage(error.toPrettyString())
+ .setPayload(row.toPrettyString())
+ .setJobName(context.getPipelineOptions().getJobName())
+ .setTransformName("WriteTableRowToBigQuery")
+ .build());
+ } catch (IOException e) {
+ log.error(e.getMessage());
+ }
+ }
+ }));
+
+ // Since BigQueryIO does not support emitting successful writes, we set successfulInserts to
+ // an empty stream,
+ // and no metrics will be collected.
+ PCollection successfulInserts =
+ input.apply(
+ "dummy",
+ ParDo.of(
+ new DoFn() {
+ @ProcessElement
+ public void processElement(ProcessContext context) {}
+ }));
+
+ return WriteResult.in(input.getPipeline(), successfulInserts, failedElements);
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/FeatureRowToTableRow.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/FeatureRowToTableRow.java
new file mode 100644
index 0000000000..4f84572c1b
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/FeatureRowToTableRow.java
@@ -0,0 +1,109 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.write;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.protobuf.util.Timestamps;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.FieldProto.Field;
+import java.util.Base64;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Instant;
+
+// TODO: Validate FeatureRow against FeatureSetSpec
+// i.e. that the value types in FeatureRow matches against those in FeatureSetSpec
+
+public class FeatureRowToTableRow implements SerializableFunction {
+ private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
+ private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
+ private static final String JOB_ID_COLUMN = "job_id";
+ private final String jobId;
+
+ public FeatureRowToTableRow(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public static String getEventTimestampColumn() {
+ return EVENT_TIMESTAMP_COLUMN;
+ }
+
+ public TableRow apply(FeatureRow featureRow) {
+
+ TableRow tableRow = new TableRow();
+ tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp()));
+ tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString());
+ tableRow.set(JOB_ID_COLUMN, jobId);
+
+ for (Field field : featureRow.getFieldsList()) {
+ switch (field.getValue().getValCase()) {
+ case BYTES_VAL:
+ tableRow.set(
+ field.getName(),
+ Base64.getEncoder().encodeToString(field.getValue().getBytesVal().toByteArray()));
+ break;
+ case STRING_VAL:
+ tableRow.set(field.getName(), field.getValue().getStringVal());
+ break;
+ case INT32_VAL:
+ tableRow.set(field.getName(), field.getValue().getInt32Val());
+ break;
+ case INT64_VAL:
+ tableRow.set(field.getName(), field.getValue().getInt64Val());
+ break;
+ case DOUBLE_VAL:
+ tableRow.set(field.getName(), field.getValue().getDoubleVal());
+ break;
+ case FLOAT_VAL:
+ tableRow.set(field.getName(), field.getValue().getFloatVal());
+ break;
+ case BOOL_VAL:
+ tableRow.set(field.getName(), field.getValue().getBoolVal());
+ break;
+ case BYTES_LIST_VAL:
+ tableRow.set(
+ field.getName(),
+ field.getValue().getBytesListVal().getValList().stream()
+ .map(x -> Base64.getEncoder().encodeToString(x.toByteArray()))
+ .collect(Collectors.toList()));
+ break;
+ case STRING_LIST_VAL:
+ tableRow.set(field.getName(), field.getValue().getStringListVal().getValList());
+ break;
+ case INT32_LIST_VAL:
+ tableRow.set(field.getName(), field.getValue().getInt32ListVal().getValList());
+ break;
+ case INT64_LIST_VAL:
+ tableRow.set(field.getName(), field.getValue().getInt64ListVal().getValList());
+ break;
+ case DOUBLE_LIST_VAL:
+ tableRow.set(field.getName(), field.getValue().getDoubleListVal().getValList());
+ break;
+ case FLOAT_LIST_VAL:
+ tableRow.set(field.getName(), field.getValue().getFloatListVal().getValList());
+ break;
+ case BOOL_LIST_VAL:
+ tableRow.set(field.getName(), field.getValue().getBytesListVal().getValList());
+ break;
+ case VAL_NOT_SET:
+ break;
+ }
+ }
+
+ return tableRow;
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/GetTableDestination.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/GetTableDestination.java
new file mode 100644
index 0000000000..02c2fb1906
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/write/GetTableDestination.java
@@ -0,0 +1,52 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2019 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.storage.connectors.bigquery.write;
+
+import com.google.api.services.bigquery.model.TimePartitioning;
+import feast.types.FeatureRowProto.FeatureRow;
+import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+public class GetTableDestination
+ implements SerializableFunction, TableDestination> {
+
+ private String projectId;
+ private String datasetId;
+
+ public GetTableDestination(String projectId, String datasetId) {
+ this.projectId = projectId;
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public TableDestination apply(ValueInSingleWindow input) {
+ String[] split = input.getValue().getFeatureSet().split(":");
+ String[] splitName = split[0].split("/");
+
+ TimePartitioning timePartitioning =
+ new TimePartitioning()
+ .setType("DAY")
+ .setField(FeatureRowToTableRow.getEventTimestampColumn());
+
+ return new TableDestination(
+ String.format(
+ "%s:%s.%s_%s_v%s", projectId, datasetId, splitName[0], splitName[1], split[1]),
+ String.format("Feast table for %s", input.getValue().getFeatureSet()),
+ timePartitioning);
+ }
+}
diff --git a/storage/connectors/bigquery/src/main/resources/schemas/deadletter_table_schema.json b/storage/connectors/bigquery/src/main/resources/schemas/deadletter_table_schema.json
new file mode 100644
index 0000000000..9238118907
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/resources/schemas/deadletter_table_schema.json
@@ -0,0 +1,34 @@
+{
+ "fields": [
+ {
+ "name": "timestamp",
+ "type": "TIMESTAMP",
+ "mode": "REQUIRED"
+ },
+ {
+ "name": "job_name",
+ "type": "STRING",
+ "mode": "NULLABLE"
+ },
+ {
+ "name": "transform_name",
+ "type": "STRING",
+ "mode": "NULLABLE"
+ },
+ {
+ "name": "payload",
+ "type": "STRING",
+ "mode": "NULLABLE"
+ },
+ {
+ "name": "error_message",
+ "type": "STRING",
+ "mode": "NULLABLE"
+ },
+ {
+ "name": "stack_trace",
+ "type": "STRING",
+ "mode": "NULLABLE"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/storage/connectors/bigquery/src/main/resources/templates/join_featuresets.sql b/storage/connectors/bigquery/src/main/resources/templates/join_featuresets.sql
new file mode 100644
index 0000000000..60b7c7d7a1
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/resources/templates/join_featuresets.sql
@@ -0,0 +1,24 @@
+/*
+ Joins the outputs of multiple point-in-time-correctness joins to a single table.
+ */
+WITH joined as (
+SELECT * FROM `{{ leftTableName }}`
+{% for featureSet in featureSets %}
+LEFT JOIN (
+ SELECT
+ uuid,
+ {% for featureName in featureSet.features %}
+ {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %}
+ {% endfor %}
+ FROM `{{ featureSet.table }}`
+) USING (uuid)
+{% endfor %}
+) SELECT
+ event_timestamp,
+ {{ entities | join(', ') }}
+ {% for featureSet in featureSets %}
+ {% for featureName in featureSet.features %}
+ ,{{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }} as {{ featureName }}
+ {% endfor %}
+ {% endfor %}
+FROM joined
\ No newline at end of file
diff --git a/storage/connectors/bigquery/src/main/resources/templates/single_featureset_pit_join.sql b/storage/connectors/bigquery/src/main/resources/templates/single_featureset_pit_join.sql
new file mode 100644
index 0000000000..fb4c555b52
--- /dev/null
+++ b/storage/connectors/bigquery/src/main/resources/templates/single_featureset_pit_join.sql
@@ -0,0 +1,90 @@
+/*
+ This query template performs the point-in-time correctness join for a single feature set table
+ to the provided entity table.
+
+ 1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
+ Feature values are joined to this table later for improved efficiency.
+ featureset_timestamp is equal to null in rows from the entity dataset.
+ */
+WITH union_features AS (
+SELECT
+ -- uuid is a unique identifier for each row in the entity dataset. Generated by `QueryTemplater.createEntityTableUUIDQuery`
+ uuid,
+ -- event_timestamp contains the timestamps to join onto
+ event_timestamp,
+ -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
+ NULL as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
+ -- created timestamp of the feature at the corresponding feature_timestamp
+ NULL as created_timestamp,
+ -- select only entities belonging to this feature set
+ {{ featureSet.entities | join(', ')}},
+ -- boolean for filtering the dataset later
+ true AS is_entity_table
+FROM `{{leftTableName}}`
+UNION ALL
+SELECT
+ NULL as uuid,
+ event_timestamp,
+ event_timestamp as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
+ created_timestamp,
+ {{ featureSet.entities | join(', ')}},
+ false AS is_entity_table
+FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}'
+{% if featureSet.maxAge == 0 %}{% else %}AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second){% endif %}
+),
+/*
+ 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
+ well as is_entity_table.
+ Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
+ in the rows from the entity table should now contain the latest timestamps relative to the row's
+ event_timestamp.
+
+ For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
+ feature_timestamp to null.
+ */
+joined AS (
+SELECT
+ uuid,
+ event_timestamp,
+ {{ featureSet.entities | join(', ')}},
+ {% for featureName in featureSet.features %}
+ IF(event_timestamp >= {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp {% if featureSet.maxAge == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureSet.maxAge }} second) < {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp{% endif %}, {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}, NULL) as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %}
+ {% endfor %}
+FROM (
+SELECT
+ uuid,
+ event_timestamp,
+ {{ featureSet.entities | join(', ')}},
+ FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,
+ FIRST_VALUE({{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp IGNORE NULLS) over w AS {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
+ is_entity_table
+FROM union_features
+WINDOW w AS (PARTITION BY {{ featureSet.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
+)
+/*
+ 3. Select only the rows from the entity table, and join the features from the original feature set table
+ to the dataset using the entity values, feature_timestamp, and created_timestamps.
+ */
+LEFT JOIN (
+SELECT
+ event_timestamp as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
+ created_timestamp,
+ {{ featureSet.entities | join(', ')}},
+ {% for featureName in featureSet.features %}
+ {{ featureName }} as {{ featureSet.project }}_{{ featureName }}_v{{ featureSet.version }}{% if loop.last %}{% else %}, {% endif %}
+ {% endfor %}
+FROM `{{ projectId }}.{{ datasetId }}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}'
+{% if featureSet.maxAge == 0 %}{% else %}AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second){% endif %}
+) USING ({{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, created_timestamp, {{ featureSet.entities | join(', ')}})
+WHERE is_entity_table
+)
+/*
+ 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row UUID.
+ */
+SELECT
+ k.*
+FROM (
+ SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k
+ FROM joined row
+ GROUP BY uuid
+)
\ No newline at end of file
diff --git a/storage/connectors/pom.xml b/storage/connectors/pom.xml
index 265aa63247..bb6883a0b0 100644
--- a/storage/connectors/pom.xml
+++ b/storage/connectors/pom.xml
@@ -16,9 +16,22 @@
redis
+ bigquery
+
+ dev.feast
+ datatypes-java
+ ${project.version}
+
+
+
+ dev.feast
+ feast-storage-api
+ ${project.version}
+
+
junit
junit