You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
io.confluent.connect.s3.format.parquet.ParquetFormat and ParquetRecordWriterProvider writes value in record field of type map and enum as bytes in Parquet result
#713
Open
richard-urena opened this issue
Feb 3, 2024
· 0 comments
Context:
My Confluent S3 Sink connector (v10.5.6) is configured to export kafka messages from a topic in AWS MSK that were produced with a schema in the AWS Glue schema registry. I have successfully configured the AWSKafkaAvroConverter to take care of the deserialization of my message. And I set up the config prop "format.class" to io.confluent.connect.s3.format.parquet.ParquetFormat. as that's the target format for S3 that the owner of the S3 bucket expects.
Issue
The export, and deserialization works fine, and the S3 output is parquet as expected. However, there are 2 fields with values that include only bytes instead of the type specified in the Schema. Those two fields are of type Map, and Enum.
There is NO Error found in the connector logs, so wondering if the types of field (map and enum) are supported for conversion to parquet by the ParquetFormat and ParquetRecordWriterProvider classes. The rest of the values like string, boolean, int, and bytes are properly output as the same type in the parquet result to S3.
Ask:
Any advise, or leads will help my team troubleshoot this as there is no connector error or exception thrown. There is only a single version of the schema which is used to produce the record. A separate non-connector kafka consumer is able to deserialize the message using the same schema without any issues.
The text was updated successfully, but these errors were encountered:
richard-urena
changed the title
io.confluent.connect.s3.format.parquet.ParquetFormat and ParquetRecordWriterProvider writes value in record field of type map as bytes
io.confluent.connect.s3.format.parquet.ParquetFormat and ParquetRecordWriterProvider writes value in record field of type map and enum as bytes in Parquet result
Feb 3, 2024
Context:
My Confluent S3 Sink connector (v10.5.6) is configured to export kafka messages from a topic in AWS MSK that were produced with a schema in the AWS Glue schema registry. I have successfully configured the AWSKafkaAvroConverter to take care of the deserialization of my message. And I set up the config prop "format.class" to io.confluent.connect.s3.format.parquet.ParquetFormat. as that's the target format for S3 that the owner of the S3 bucket expects.
Issue
The export, and deserialization works fine, and the S3 output is parquet as expected. However, there are 2 fields with values that include only bytes instead of the type specified in the Schema. Those two fields are of type Map, and Enum.
There is NO Error found in the connector logs, so wondering if the types of field (map and enum) are supported for conversion to parquet by the ParquetFormat and ParquetRecordWriterProvider classes. The rest of the values like string, boolean, int, and bytes are properly output as the same type in the parquet result to S3.
Ask:
Any advise, or leads will help my team troubleshoot this as there is no connector error or exception thrown. There is only a single version of the schema which is used to produce the record. A separate non-connector kafka consumer is able to deserialize the message using the same schema without any issues.
Key properties of this config:
connector.class=io.confluent.connect.s3.S3SinkConnector
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
flush.size=1000
enhanced.avro.schema.support=true
parquet.codec=snappy
allow.optional.map.keys=true
s3.bucket.name=my-bucket-name
s3.region=us-east-1
topics=topic-live-streaming
topics.dir=data_streaming
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.avroRecordType=GENERIC_RECORD
key.converter.avroRecordType=GENERIC_RECORD
value.converter.registry.name=my-schema-registry-1234
key.converter.registry.name=my-schema-registry-1234
I referred to the docs in this page for configuration per my usecase: https://docs.confluent.io/kafka-connectors/s3-sink/current/configuration_options.html
The text was updated successfully, but these errors were encountered: