-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream/field name validation/filtering #1004
Changes from all commits
5d70768
4914075
7842d2d
f08d713
de7a777
f7862da
ee8db5f
fd71a2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
{ | ||
"type": "object", | ||
"properties": { | ||
"date": { "type": "string", "format": "date-time" }, | ||
"C A D": { "type": ["null", "number"] }, | ||
"HKD": { "type": ["null", "number"] }, | ||
"ISK": { "type": ["null", "number"] }, | ||
"PHP": { "type": ["null", "number"] }, | ||
"DKK": { "type": ["null", "number"] }, | ||
"HUF": { "\"type": ["null", "number"] }, | ||
"CZK": { "type": ["null", "number"] } | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
{ | ||
"type": "object", | ||
"properties": { | ||
"date": { "type": "string", "format": "date-time" }, | ||
"CAD": { "type": ["null", "number"] }, | ||
"HKD": { "type": ["null", "number"] }, | ||
"ISK": { "type": ["null", "number"] }, | ||
"PHP": { "type": ["null", "number"] }, | ||
"DKK": { "type": ["null", "number"] }, | ||
"HUF": { "type": ["null", "number"] }, | ||
"文": { "type": ["null", "number"] } | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
package io.airbyte.workers; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.collect.Multimap; | ||
import io.airbyte.commons.io.IOs; | ||
import io.airbyte.commons.io.LineGobbler; | ||
import io.airbyte.commons.json.Jsons; | ||
|
@@ -33,11 +34,13 @@ | |
import io.airbyte.protocol.models.AirbyteCatalog; | ||
import io.airbyte.protocol.models.AirbyteMessage; | ||
import io.airbyte.protocol.models.AirbyteMessage.Type; | ||
import io.airbyte.protocol.models.CatalogHelpers; | ||
import io.airbyte.workers.process.IntegrationLauncher; | ||
import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory; | ||
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory; | ||
import java.io.InputStream; | ||
import java.nio.file.Path; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import org.slf4j.Logger; | ||
|
@@ -87,7 +90,21 @@ public OutputAndStatus<StandardDiscoverCatalogOutput> run(final StandardDiscover | |
int exitCode = process.exitValue(); | ||
if (exitCode == 0) { | ||
if (catalog.isEmpty()) { | ||
LOGGER.error("integration failed to output a catalog struct."); | ||
LOGGER.error("Integration failed to output a catalog struct."); | ||
return new OutputAndStatus<>(JobStatus.FAILED); | ||
} | ||
|
||
List<String> invalidStreamNames = CatalogHelpers.getInvalidStreamNames(catalog.get()); | ||
|
||
if (!invalidStreamNames.isEmpty()) { | ||
invalidStreamNames.forEach(streamName -> LOGGER.error("Cannot sync invalid stream name: " + streamName)); | ||
return new OutputAndStatus<>(JobStatus.FAILED); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also return a message describing why discovery failed so we can show this on the UI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand that, but I'm just worried that in its current state, there is no way for the user to know what's wrong. It'll just look like airbyte is broken if e.g: they sync Gsheets with a space in the name, and all they'll see on the UI is "an error happened" |
||
} | ||
|
||
Multimap<String, String> streamNameToInvalidFieldNames = CatalogHelpers.getInvalidFieldNames(catalog.get()); | ||
if (!streamNameToInvalidFieldNames.isEmpty()) { | ||
streamNameToInvalidFieldNames | ||
.forEach((streamName, fieldNames) -> LOGGER.error("Cannot sync invalid field names for stream " + streamName + ": " + fieldNames)); | ||
return new OutputAndStatus<>(JobStatus.FAILED); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a test with Unicode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added