Skip to content

Commit

Permalink
handle stream name fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Nov 18, 2020
1 parent da059f0 commit 80d956a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;

public class CatalogHelpers {
Expand Down Expand Up @@ -154,8 +155,19 @@ public static boolean isValidIdentifier(String identifier) {
* @return list of stream names in the catalog that are invalid
*/
public static List<String> getInvalidStreamNames(AirbyteCatalog catalog) {
return catalog.getStreams().stream()
.map(AirbyteStream::getName)
return getInvalidStreamNames(catalog.getStreams().stream().map(AirbyteStream::getName));
}

/**
* @param catalog configured airbyte catalog
* @return list of stream names in the catalog that are invalid
*/
public static List<String> getInvalidStreamNames(ConfiguredAirbyteCatalog catalog) {
return getInvalidStreamNames(catalog.getStreams().stream().map(ConfiguredAirbyteStream::getName));
}

private static List<String> getInvalidStreamNames(Stream<String> names) {
return names
.filter(streamName -> !isValidIdentifier(streamName))
.collect(Collectors.toList());
}
Expand All @@ -165,14 +177,39 @@ public static List<String> getInvalidStreamNames(AirbyteCatalog catalog) {
* @return multimap of stream names to all invalid field names in that stream
*/
public static Multimap<String, String> getInvalidFieldNames(AirbyteCatalog catalog) {
Multimap<String, String> streamNameToInvalidFieldNames = Multimaps.newSetMultimap(new HashMap<>(), HashSet::new);
return getInvalidFieldNames(getStreamNameToJsonSchema(catalog));
}

/**
* @param catalog configured airbyte catalog
* @return multimap of stream names to all invalid field names in that stream
*/
public static Multimap<String, String> getInvalidFieldNames(ConfiguredAirbyteCatalog catalog) {
return getInvalidFieldNames(getStreamNameToJsonSchema(catalog));
}

private static Map<String, JsonNode> getStreamNameToJsonSchema(AirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.collect(Collectors.toMap(AirbyteStream::getName, AirbyteStream::getJsonSchema));
}

private static Map<String, JsonNode> getStreamNameToJsonSchema(ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.collect(Collectors.toMap(ConfiguredAirbyteStream::getName, ConfiguredAirbyteStream::getJsonSchema));
}

private static Multimap<String, String> getInvalidFieldNames(Map<String, JsonNode> streamNameToJsonSchema) {
final Multimap<String, String> streamNameToInvalidFieldNames = Multimaps.newSetMultimap(new HashMap<>(), HashSet::new);

for (AirbyteStream stream : catalog.getStreams()) {
Set<String> invalidFieldNames = getAllFieldNames(stream.getJsonSchema()).stream()
for (final Map.Entry<String, JsonNode> entry : streamNameToJsonSchema.entrySet()) {
final Set<String> invalidFieldNames = getAllFieldNames(entry.getValue())
.stream()
.filter(streamName -> !isValidIdentifier(streamName))
.collect(Collectors.toSet());

streamNameToInvalidFieldNames.putAll(stream.getName(), invalidFieldNames);
streamNameToInvalidFieldNames.putAll(entry.getKey(), invalidFieldNames);
}

return streamNameToInvalidFieldNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ public OutputAndStatus<StandardDiscoverCatalogOutput> run(final StandardDiscover
Multimap<String, String> streamNameToInvalidFieldNames = CatalogHelpers.getInvalidFieldNames(catalog.get());
if (!streamNameToInvalidFieldNames.isEmpty()) {
streamNameToInvalidFieldNames
.forEach((streamName, fieldNames) -> LOGGER.error("Cannot sync invalid field names for stream " + streamName + ": " + fieldNames));
.forEach((streamName, fieldNames) -> {
LOGGER.error("Cannot sync invalid field names for stream " + streamName + ": " + fieldNames);
LOGGER.error("Catalog: " + Jsons.serialize(catalog.get()));
});
return new OutputAndStatus<>(JobStatus.FAILED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.StandardTargetConfig;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.protocols.Destination;
import io.airbyte.workers.protocols.MessageTracker;
Expand Down Expand Up @@ -138,12 +138,12 @@ public void cancel() {
cancelled.set(true);
}

private void removeInvalidStreams(AirbyteCatalog catalog) {
private void removeInvalidStreams(ConfiguredAirbyteCatalog catalog) {
final Set<String> invalidStreams = Sets.union(
new HashSet<>(CatalogHelpers.getInvalidStreamNames(catalog)),
CatalogHelpers.getInvalidFieldNames(catalog).keySet());

final List<AirbyteStream> streams = catalog.getStreams().stream()
final List<ConfiguredAirbyteStream> streams = catalog.getStreams().stream()
.filter(stream -> !invalidStreams.contains(stream.getName()))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.StandardTargetConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.protocols.airbyte.AirbyteDestination;
import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker;
Expand Down Expand Up @@ -86,12 +86,12 @@ void setup() throws Exception {
invalidSyncInput.setState(validSyncInput.getState());
invalidSyncInput.setSyncMode(validSyncInput.getSyncMode());

final AirbyteStream invalidStream = new AirbyteStream();
final ConfiguredAirbyteStream invalidStream = new ConfiguredAirbyteStream();
invalidStream.setName(INVALID_STREAM_NAME);
invalidStream.setJsonSchema(Jsons.deserialize("{}"));
List<AirbyteStream> streams = new ArrayList<>(validSyncInput.getCatalog().getStreams());
final List<ConfiguredAirbyteStream> streams = new ArrayList<>(validSyncInput.getCatalog().getStreams());
streams.add(invalidStream);
AirbyteCatalog catalog = new AirbyteCatalog();
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog();
catalog.setStreams(streams);
invalidSyncInput.setCatalog(catalog);

Expand Down

0 comments on commit 80d956a

Please sign in to comment.