diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryWrite.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryWrite.java index 6916534e4f..a9d5fb6438 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryWrite.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryWrite.java @@ -218,14 +218,14 @@ public void process(ProcessContext c) { @ProcessElement public void process(ProcessContext c) { CoGbkResult result = c.element().getValue(); + boolean ready = result.getAll(successTag).iterator().hasNext(); + if (!ready) { + return; + } + result - .getAll(successTag) - .forEach( - success -> - result - .getAll(inputTag) - .forEach( - rows -> rows.getFeatureRows().forEachRemaining(c::output))); + .getAll(inputTag) + .forEach(rows -> rows.getFeatureRows().forEachRemaining(c::output)); } })); } diff --git a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java index 18c453ed67..3f35c5e4ae 100644 --- a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java +++ b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java @@ -20,6 +20,7 @@ import static feast.storage.common.testing.TestUtil.field; import static feast.storage.connectors.bigquery.writer.FeatureSetSpecToTableSchema.*; import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.*; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; @@ -464,7 +465,7 @@ public void featureRowCompressShouldPackAndUnpackSuccessfully() { PCollection result = p.apply(Create.of(inputWithNulls)) .apply("KV", ParDo.of(new ExtractKV())) - .apply(new CompactFeatureRows(1000)) + .apply(new CompactFeatureRows(10000)) .apply("Flat", ParDo.of(new FlatMap())); List inputWithoutNulls = dropNullFeature(input); @@ -480,7 +481,15 @@ public void featureRowCompressShouldPackAndUnpackSuccessfully() { .addAllFields(copyFieldsWithout(rowWithNull, "entity", "null_value")) .build()); - PAssert.that(result).containsInAnyOrder(inputWithoutNulls); + PAssert.that(result) + .satisfies( + actual -> { + List actualSorted = sortFeaturesByName(Lists.newArrayList(actual)); + List expectedSorted = sortFeaturesByName(inputWithoutNulls); + + assertThat(actualSorted, containsInAnyOrder(expectedSorted.toArray())); + return null; + }); p.run(); } @@ -490,10 +499,7 @@ private List dropNullFeature(List input) { r -> FeatureRow.newBuilder() .setFeatureSet(r.getFeatureSet()) - .addAllFields( - r.getFieldsList().stream() - .filter(f -> !f.getName().equals("null_value")) - .collect(Collectors.toList())) + .addAllFields(copyFieldsWithout(r, "null_value")) .build()) .collect(Collectors.toList()); } @@ -505,6 +511,21 @@ private List copyFieldsWithout(FeatureRow row, String... excep .collect(Collectors.toList()); } + public static List sortFeaturesByName(List rows) { + return rows.stream() + .map( + row -> { + List fieldsList = Lists.newArrayList(row.getFieldsList()); + fieldsList.sort(Comparator.comparing(FieldProto.Field::getName)); + + return FeatureRow.newBuilder() + .setFeatureSet(row.getFeatureSet()) + .addAllFields(fieldsList) + .build(); + }) + .collect(Collectors.toList()); + } + public static class TableAnswer implements Answer, Serializable { TableId tableId; TableDefinition tableDefinition;