diff --git a/airbyte-protocol/models/build.gradle b/airbyte-protocol/models/build.gradle index 1637f5c86e18b..e4199332b848d 100644 --- a/airbyte-protocol/models/build.gradle +++ b/airbyte-protocol/models/build.gradle @@ -5,7 +5,8 @@ plugins { } dependencies { - implementation group: 'javax.validation', name: 'validation-api', version: '1.1.0.Final' + implementation 'javax.validation:validation-api:1.1.0.Final' + implementation 'org.apache.commons:commons-lang3:3.11' } jsonSchema2Pojo { diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 30ef4b75b7b3b..2389647276c41 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -25,14 +25,21 @@ package io.airbyte.protocol.models; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import io.airbyte.commons.json.Jsons; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; public class CatalogHelpers { @@ -77,4 +84,62 @@ public static Set getTopLevelFieldNames(final AirbyteStream stream) { return object.keySet(); } + /** + * @param node any json node + * @return a set of all keys for all objects within the node + */ + @VisibleForTesting + protected static Set getAllFieldNames(JsonNode node) { + Set allFieldNames = new HashSet<>(); + + Iterator fieldNames = node.fieldNames(); + while (fieldNames.hasNext()) { + String fieldName = fieldNames.next(); + allFieldNames.add(fieldName); + JsonNode fieldValue = node.get(fieldName); + if (fieldValue.isObject()) { + allFieldNames.addAll(getAllFieldNames(fieldValue)); + } + } + + return allFieldNames; + } + + /** + * @param identifier stream name or field name + * @return if the identifier matches the alphanumeric+underscore requirement for identifiers + */ + public static boolean isValidIdentifier(String identifier) { + return StringUtils.isAlphanumeric(identifier.replace("_", "")); + } + + /** + * @param catalog airbyte catalog + * @return list of stream names in the catalog that are invalid + */ + public static List getInvalidStreamNames(AirbyteCatalog catalog) { + return catalog.getStreams().stream() + .map(AirbyteStream::getName) + .filter(streamName -> !isValidIdentifier(streamName)) + .collect(Collectors.toList()); + } + + /** + * @param catalog airbyte catalog + * @return multimap of stream names to all invalid field names in that stream + */ + public static Multimap getInvalidFieldNames(AirbyteCatalog catalog) { + Multimap streamNameToInvalidFieldNames = Multimaps.newSetMultimap(new HashMap<>(), HashSet::new); + + for (AirbyteStream stream : catalog.getStreams()) { + Set invalidFieldNames = getAllFieldNames(stream.getJsonSchema()).stream() + .filter(streamName -> !isValidIdentifier(streamName)) + .collect(Collectors.toSet()); + + streamNameToInvalidFieldNames.putAll(stream.getName(), invalidFieldNames); + } + + return streamNameToInvalidFieldNames; + } + } diff --git a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index f20d1e2218f40..1fb0cb6e926c4 100644 --- a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -25,11 +25,21 @@ package io.airbyte.protocol.models; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; +import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Set; import org.junit.jupiter.api.Test; @@ -51,4 +61,76 @@ void testGetTopLevelFieldNames() { assertEquals(Sets.newHashSet("name"), actualFieldNames); } + @Test + void testValidIdentifiers() { + assertTrue(CatalogHelpers.isValidIdentifier("identifier_name")); + assertTrue(CatalogHelpers.isValidIdentifier("iDenTiFieR_name")); + assertTrue(CatalogHelpers.isValidIdentifier("__identifier_name")); + assertTrue(CatalogHelpers.isValidIdentifier("IDENTIFIER_NAME")); + assertTrue(CatalogHelpers.isValidIdentifier("123identifier_name")); + assertTrue(CatalogHelpers.isValidIdentifier("i0d0e0n0t0i0f0i0e0r0n0a0m0e")); + assertTrue(CatalogHelpers.isValidIdentifier("identifiêr")); + assertTrue(CatalogHelpers.isValidIdentifier("a_unicode_name_文")); + assertTrue(CatalogHelpers.isValidIdentifier("identifier__name__")); + } + + @Test + void testInvalidIdentifiers() { + assertFalse(CatalogHelpers.isValidIdentifier("invalid-identifier")); + assertFalse(CatalogHelpers.isValidIdentifier("\"identifier name")); + assertFalse(CatalogHelpers.isValidIdentifier("$identifier")); + assertFalse(CatalogHelpers.isValidIdentifier("identifier name")); + assertFalse(CatalogHelpers.isValidIdentifier("identifier%")); + assertFalse(CatalogHelpers.isValidIdentifier("`identifier`")); + assertFalse(CatalogHelpers.isValidIdentifier("'identifier'")); + } + + @Test + void testGetInvalidStreamNames() { + final String validStreamName = "Valid_Stream"; + final AirbyteStream validStream = new AirbyteStream(); + validStream.setName(validStreamName); + + final String invalidStreamName = "invalid stream"; + AirbyteStream invalidStream = new AirbyteStream(); + invalidStream.setName(invalidStreamName); + + AirbyteCatalog catalog = new AirbyteCatalog(); + catalog.setStreams(List.of(validStream, invalidStream)); + + List invalidStreamNames = CatalogHelpers.getInvalidStreamNames(catalog); + assertIterableEquals(Collections.singleton(invalidStreamName), invalidStreamNames); + } + + @Test + void testGetFieldNames() throws IOException { + JsonNode node = Jsons.deserialize(MoreResources.readResource("valid_schema.json")); + Set actualFieldNames = CatalogHelpers.getAllFieldNames(node); + Set expectedFieldNames = ImmutableSet.of("type", "properties", "format", "date", "CAD", "HKD", "ISK", "PHP", "DKK", "HUF", "文"); + + assertEquals(expectedFieldNames, actualFieldNames); + } + + @Test + void testGetInvalidFieldNames() throws IOException { + final String validStreamName = "Valid_Stream"; + final AirbyteStream validStream = new AirbyteStream(); + validStream.setName(validStreamName); + JsonNode validSchema = Jsons.deserialize(MoreResources.readResource("valid_schema.json")); + validStream.setJsonSchema(validSchema); + + final String invalidStreamName = "invalid stream"; + AirbyteStream invalidStream = new AirbyteStream(); + invalidStream.setName(invalidStreamName); + JsonNode invalidSchema = Jsons.deserialize(MoreResources.readResource("invalid_schema.json")); + invalidStream.setJsonSchema(invalidSchema); + + AirbyteCatalog catalog = new AirbyteCatalog(); + catalog.setStreams(List.of(validStream, invalidStream)); + + Multimap streamNameToInvalidFieldNames = CatalogHelpers.getInvalidFieldNames(catalog); + assertIterableEquals(Collections.singleton(invalidStreamName), streamNameToInvalidFieldNames.keySet()); + assertIterableEquals(ImmutableList.of("C A D", "\"type"), streamNameToInvalidFieldNames.get(invalidStreamName)); + } + } diff --git a/airbyte-protocol/models/src/test/resources/invalid_schema.json b/airbyte-protocol/models/src/test/resources/invalid_schema.json new file mode 100644 index 0000000000000..389fc9ce37b77 --- /dev/null +++ b/airbyte-protocol/models/src/test/resources/invalid_schema.json @@ -0,0 +1,13 @@ +{ + "type": "object", + "properties": { + "date": { "type": "string", "format": "date-time" }, + "C A D": { "type": ["null", "number"] }, + "HKD": { "type": ["null", "number"] }, + "ISK": { "type": ["null", "number"] }, + "PHP": { "type": ["null", "number"] }, + "DKK": { "type": ["null", "number"] }, + "HUF": { "\"type": ["null", "number"] }, + "CZK": { "type": ["null", "number"] } + } +} diff --git a/airbyte-protocol/models/src/test/resources/valid_schema.json b/airbyte-protocol/models/src/test/resources/valid_schema.json new file mode 100644 index 0000000000000..1b51dad71d8f2 --- /dev/null +++ b/airbyte-protocol/models/src/test/resources/valid_schema.json @@ -0,0 +1,13 @@ +{ + "type": "object", + "properties": { + "date": { "type": "string", "format": "date-time" }, + "CAD": { "type": ["null", "number"] }, + "HKD": { "type": ["null", "number"] }, + "ISK": { "type": ["null", "number"] }, + "PHP": { "type": ["null", "number"] }, + "DKK": { "type": ["null", "number"] }, + "HUF": { "type": ["null", "number"] }, + "文": { "type": ["null", "number"] } + } +} diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java index 518cfafdb1ad1..0a37cc5ee32e7 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java @@ -137,7 +137,7 @@ private WorkerRun createSyncWorker(JobSyncConfig config, Path jobRoot) { jobRoot, syncInput, new JobOutputSyncWorker( - new DefaultSyncWorker<>( + new DefaultSyncWorker( new DefaultAirbyteSource(sourceLauncher), new DefaultAirbyteDestination(destinationLauncher), new AirbyteMessageTracker(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java index dbca1f9ea86d8..e6dba4f4f4b66 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultDiscoverCatalogWorker.java @@ -25,6 +25,7 @@ package io.airbyte.workers; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Multimap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; @@ -33,11 +34,13 @@ import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory; import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory; import java.io.InputStream; import java.nio.file.Path; +import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -87,7 +90,21 @@ public OutputAndStatus run(final StandardDiscover int exitCode = process.exitValue(); if (exitCode == 0) { if (catalog.isEmpty()) { - LOGGER.error("integration failed to output a catalog struct."); + LOGGER.error("Integration failed to output a catalog struct."); + return new OutputAndStatus<>(JobStatus.FAILED); + } + + List invalidStreamNames = CatalogHelpers.getInvalidStreamNames(catalog.get()); + + if (!invalidStreamNames.isEmpty()) { + invalidStreamNames.forEach(streamName -> LOGGER.error("Cannot sync invalid stream name: " + streamName)); + return new OutputAndStatus<>(JobStatus.FAILED); + } + + Multimap streamNameToInvalidFieldNames = CatalogHelpers.getInvalidFieldNames(catalog.get()); + if (!streamNameToInvalidFieldNames.isEmpty()) { + streamNameToInvalidFieldNames + .forEach((streamName, fieldNames) -> LOGGER.error("Cannot sync invalid field names for stream " + streamName + ": " + fieldNames)); return new OutputAndStatus<>(JobStatus.FAILED); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java index dab5de1f1fa3c..34afdc04889c5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java @@ -24,37 +24,46 @@ package io.airbyte.workers; +import com.google.common.collect.Sets; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StandardTapConfig; import io.airbyte.config.StandardTargetConfig; import io.airbyte.config.State; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.workers.normalization.NormalizationRunner; import io.airbyte.workers.protocols.Destination; import io.airbyte.workers.protocols.MessageTracker; import io.airbyte.workers.protocols.Source; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DefaultSyncWorker implements SyncWorker { +public class DefaultSyncWorker implements SyncWorker { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSyncWorker.class); - private final Source source; - private final Destination destination; - private final MessageTracker messageTracker; + private final Source source; + private final Destination destination; + private final MessageTracker messageTracker; private final NormalizationRunner normalizationRunner; private final AtomicBoolean cancelled; - public DefaultSyncWorker(final Source source, - final Destination destination, - final MessageTracker messageTracker, + public DefaultSyncWorker(final Source source, + final Destination destination, + final MessageTracker messageTracker, final NormalizationRunner normalizationRunner) { this.source = source; this.destination = destination; @@ -68,6 +77,9 @@ public DefaultSyncWorker(final Source source, public OutputAndStatus run(StandardSyncInput syncInput, Path jobRoot) { long startTime = System.currentTimeMillis(); + // clean catalog object + removeInvalidStreams(syncInput.getCatalog()); + final StandardTapConfig tapConfig = WorkerUtils.syncToTapConfig(syncInput); final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput); @@ -77,11 +89,16 @@ public OutputAndStatus run(StandardSyncInput syncInput, Path source.start(tapConfig, jobRoot); while (!cancelled.get() && !source.isFinished()) { - final Optional maybeMessage = source.attemptRead(); + final Optional maybeMessage = source.attemptRead(); if (maybeMessage.isPresent()) { - final T message = maybeMessage.get(); - messageTracker.accept(message); - destination.accept(message); + final AirbyteMessage message = maybeMessage.get(); + + if (message.getType().equals(AirbyteMessage.Type.RECORD) && !CatalogHelpers.isValidIdentifier(message.getRecord().getStream())) { + LOGGER.error("Filtered out record for invalid stream: " + message.getRecord().getStream()); + } else { + messageTracker.accept(message); + destination.accept(message); + } } } @@ -122,4 +139,16 @@ public void cancel() { cancelled.set(true); } + private void removeInvalidStreams(AirbyteCatalog catalog) { + final Set invalidStreams = Sets.union( + new HashSet<>(CatalogHelpers.getInvalidStreamNames(catalog)), + CatalogHelpers.getInvalidFieldNames(catalog).keySet()); + + final List streams = catalog.getStreams().stream() + .filter(stream -> !invalidStreams.contains(stream.getName())) + .collect(Collectors.toList()); + + catalog.setStreams(streams); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java index 19f7551136f06..4df4c63aea165 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java @@ -26,15 +26,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.airbyte.config.StandardSync; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardTapConfig; import io.airbyte.config.StandardTargetConfig; +import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.workers.normalization.NormalizationRunner; import io.airbyte.workers.protocols.airbyte.AirbyteDestination; import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker; @@ -42,8 +45,9 @@ import io.airbyte.workers.protocols.airbyte.AirbyteSource; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -51,15 +55,17 @@ class DefaultSyncWorkerTest { private static final Path WORKSPACE_ROOT = Path.of("workspaces/10"); private static final String STREAM_NAME = "user_preferences"; + private static final String INVALID_STREAM_NAME = "invalid stream name"; private static final String FIELD_NAME = "favorite_color"; private static final AirbyteMessage RECORD_MESSAGE1 = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"); private static final AirbyteMessage RECORD_MESSAGE2 = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "yellow"); + private static final AirbyteMessage INVALID_RECORD_MESSAGE = AirbyteMessageUtils.createRecordMessage(INVALID_STREAM_NAME, FIELD_NAME, "yellow"); private Path jobRoot; private Path normalizationRoot; private AirbyteSource tap; private AirbyteDestination target; - private StandardSyncInput syncInput; + private StandardSyncInput invalidSyncInput; private StandardTapConfig tapConfig; private StandardTargetConfig targetConfig; private NormalizationRunner normalizationRunner; @@ -70,27 +76,44 @@ void setup() throws Exception { jobRoot = Files.createDirectories(Files.createTempDirectory("test").resolve(WORKSPACE_ROOT)); normalizationRoot = jobRoot.resolve("normalize"); - final ImmutablePair syncPair = TestConfigHelpers.createSyncConfig(); - syncInput = syncPair.getValue(); + final StandardSyncInput validSyncInput = TestConfigHelpers.createSyncConfig().getValue(); - tapConfig = WorkerUtils.syncToTapConfig(syncInput); - targetConfig = WorkerUtils.syncToTargetConfig(syncInput); + // create sync input with invalid stream to ensure it is filtered out + invalidSyncInput = new StandardSyncInput(); + invalidSyncInput.setConnectionId(validSyncInput.getConnectionId()); + invalidSyncInput.setDestinationConnection(validSyncInput.getDestinationConnection()); + invalidSyncInput.setSourceConnection(validSyncInput.getSourceConnection()); + invalidSyncInput.setState(validSyncInput.getState()); + invalidSyncInput.setSyncMode(validSyncInput.getSyncMode()); + + final AirbyteStream invalidStream = new AirbyteStream(); + invalidStream.setName(INVALID_STREAM_NAME); + invalidStream.setJsonSchema(Jsons.deserialize("{}")); + List streams = new ArrayList<>(validSyncInput.getCatalog().getStreams()); + streams.add(invalidStream); + AirbyteCatalog catalog = new AirbyteCatalog(); + catalog.setStreams(streams); + invalidSyncInput.setCatalog(catalog); + + tapConfig = WorkerUtils.syncToTapConfig(validSyncInput); + targetConfig = WorkerUtils.syncToTargetConfig(validSyncInput); tap = mock(AirbyteSource.class); target = mock(AirbyteDestination.class); normalizationRunner = mock(NormalizationRunner.class); - when(tap.isFinished()).thenReturn(false, false, false, true); - when(tap.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2)); + when(tap.isFinished()).thenReturn(false, false, false, false, true); + when(tap.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2), + Optional.of(INVALID_RECORD_MESSAGE)); when(normalizationRunner.normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog())) .thenReturn(true); } @Test void test() throws Exception { - final DefaultSyncWorker defaultSyncWorker = - new DefaultSyncWorker<>(tap, target, new AirbyteMessageTracker(), normalizationRunner); - final OutputAndStatus run = defaultSyncWorker.run(syncInput, jobRoot); + final DefaultSyncWorker defaultSyncWorker = + new DefaultSyncWorker(tap, target, new AirbyteMessageTracker(), normalizationRunner); + final OutputAndStatus run = defaultSyncWorker.run(invalidSyncInput, jobRoot); assertEquals(JobStatus.SUCCEEDED, run.getStatus()); @@ -98,6 +121,7 @@ void test() throws Exception { verify(target).start(targetConfig, jobRoot); verify(target).accept(RECORD_MESSAGE1); verify(target).accept(RECORD_MESSAGE2); + verify(target, never()).accept(INVALID_RECORD_MESSAGE); verify(normalizationRunner).start(); verify(normalizationRunner).normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog()); verify(normalizationRunner).close(); diff --git a/docs/architecture/airbyte-specification.md b/docs/architecture/airbyte-specification.md index 44fe5257609a4..8088d9ebff562 100644 --- a/docs/architecture/airbyte-specification.md +++ b/docs/architecture/airbyte-specification.md @@ -162,7 +162,7 @@ read(Config, AirbyteCatalog, State) -> Stream } ``` -**Note:** Airbyte only supports stream and field names which are alphanumeric or contain underscores, as identified by the regex `[a-zA-Z0-9_]*"`. Syncs may fail if they attempt to sync streams or fields whose names contain disallowed characters. +**Note:** Airbyte only supports stream and field names that contain only Unicode alphabet characters, numbers, or underscores. The discovery process will fail if any stream names or field names are invalid. Syncs will filter out any stream that has an invalid name or any invalid fields. #### Read