diff --git a/Dockerfile b/Dockerfile index 7ba7a9d..387153d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM confluentinc/cp-kafka-connect-base:5.5.0 +FROM confluentinc/cp-kafka-connect-base:5.5.1 ARG PROJECT_VERSION ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" diff --git a/config/kafka-connect-fs.properties b/config/kafka-connect-fs.properties index 27b9eb9..8596475 100644 --- a/config/kafka-connect-fs.properties +++ b/config/kafka-connect-fs.properties @@ -7,5 +7,6 @@ policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy policy.recursive=true policy.regexp=^.*\.txt$ policy.batch_size=0 +policy.cleanup=none file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader file_reader.batch_size=0 diff --git a/docker-compose.yml b/docker-compose.yml index d03f109..cbbc7bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: cp-zookeeper: - image: confluentinc/cp-zookeeper:5.5.0 + image: confluentinc/cp-zookeeper:5.5.1 hostname: zookeeper container_name: zookeeper ports: @@ -11,7 +11,7 @@ services: ZOOKEEPER_TICK_TIME: 2000 cp-kafka: - image: confluentinc/cp-kafka:5.5.0 + image: confluentinc/cp-kafka:5.5.1 hostname: kafka container_name: kafka depends_on: @@ -32,7 +32,7 @@ services: CONFLUENT_METRICS_ENABLE: 'false' cp-schema-registry: - image: confluentinc/cp-schema-registry:5.5.0 + image: confluentinc/cp-schema-registry:5.5.1 hostname: schema-registry container_name: schema-registry depends_on: @@ -45,7 +45,7 @@ services: SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' connect-fs: - image: mmolimar/kafka-connect-fs:1.1.0 + image: mmolimar/kafka-connect-fs:1.2.0 container_name: connect depends_on: - cp-kafka diff --git a/docs/source/conf.py b/docs/source/conf.py index 724450a..7c79907 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -55,9 +55,9 @@ # built documents. # # The short X.Y version. -version = '1.1' +version = '1.2' # The full version, including alpha/beta/rc tags. -release = '1.1' +release = '1.2' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/source/config_options.rst b/docs/source/config_options.rst index 0abb591..d82f238 100644 --- a/docs/source/config_options.rst +++ b/docs/source/config_options.rst @@ -39,7 +39,7 @@ General config properties for this connector. Comma-separated URIs of the FS(s). They can be URIs pointing directly to a file in the FS and also can be dynamic using expressions for modifying the URIs in runtime. These expressions have the form ``${XXX}`` where XXX represents a pattern from ``java.time.format.DateTimeFormatter`` - Java class. + `Java class `__. * Type: string * Importance: high @@ -98,6 +98,27 @@ General config properties for this connector. * Default: ``0`` * Importance: medium +``policy.cleanup`` + Cleanup strategy to use when skipping files. It's possible to move these files to another folder, remove them + or do nothing. + + * Type: enum (available values ``none``, ``move`` and ``delete``) + * Default: ``none`` + * Importance: medium + +``policy.cleanup.move`` + Target directory to move files for the ``move`` cleanup strategy. Mandatory just in case of using this strategy. + + * Type: string + * Importance: medium + +``policy.cleanup.move.prefix`` + Prefix to set to the filename in moved files. + + * Type: string + * Default: ```` + * Importance: low + ``policy..`` This represents custom properties you can include based on the policy class specified. @@ -224,20 +245,6 @@ File readers Some file readers have custom properties to define and others don't. So, depending on the configuration you'll have to take into account their properties. -.. _config_options-filereaders-avro: - -Avro --------------------------------------------- - -In order to configure custom properties for this reader, the name you must use is ``avro``. - -``file_reader.avro.schema`` - Avro schema in JSON format to use when reading a file. - If not specified, the reader will use the schema defined in the file. - - * Type: string - * Importance: medium - .. _config_options-filereaders-parquet: Parquet @@ -257,6 +264,20 @@ In order to configure custom properties for this reader, the name you must use i * Type: string * Importance: medium +.. _config_options-filereaders-avro: + +Avro +-------------------------------------------- + +In order to configure custom properties for this reader, the name you must use is ``avro``. + +``file_reader.avro.schema`` + Avro schema in JSON format to use when reading a file. + If not specified, the reader will use the schema defined in the file. + + * Type: string + * Importance: medium + .. _config_options-filereaders-orc: ORC @@ -279,7 +300,7 @@ In order to configure custom properties for this reader, the name you must use i * Default: ``false`` * Importance: medium -.. _config_options-filereaders-json: +.. _config_options-filereaders-sequencefile: SequenceFile -------------------------------------------- @@ -307,50 +328,231 @@ In order to configure custom properties for this reader, the name you must use i * Default: ``4096`` * Importance: low -.. _config_options-filereaders-json: +.. _config_options-filereaders-cobol: -JSON +Cobol -------------------------------------------- -To configure custom properties for this reader, the name you must use is ``json``. +In order to configure custom properties for this reader, the name you must use is ``cobol``. -``file_reader.json.record_per_line`` - If enabled, the reader will read each line as a record. Otherwise, the reader will read the full - content of the file as a record. +``file_reader.cobol.copybook.content`` + The content of the copybook. It is mandatory if property ``file_reader.cobol.copybook.path`` is not set. + + * Type: string + * Default: ``null`` + * Importance: high + +``file_reader.cobol.copybook.path`` + Copybook file path in the file system to be used. It is mandatory if property ``file_reader.cobol.copybook.content`` + is not set. + + * Type: string + * Default: ``null`` + * Importance: high + +``file_reader.cobol.reader.is_ebcdic`` + If the input data file encoding is EBCDIC, otherwise it is ASCII. * Type: boolean * Default: ``true`` * Importance: medium -``file_reader.json.deserialization.`` - Deserialization feature to use when reading a JSON file. You can add as much as you like - based on the ones defined `here. `__ +``file_reader.cobol.reader.ebcdic_code_page`` + Code page to be used for EBCDIC to ASCII / Unicode conversions. + + * Type: string + * Default: ``common`` + * Importance: medium + +``file_reader.cobol.reader.is_record_sequence`` + If the input file has 4 byte record length headers. * Type: boolean + * Default: ``false`` * Importance: medium -``file_reader.json.encoding`` - Encoding to use for reading a file. If not specified, the reader will use the default encoding. +``file_reader.cobol.reader.floating_point_format`` + Format used for the floating-point numbers. - * Type: string - * Default: based on the locale and charset of the underlying operating system. + * Type: enum (available values ``ibm``, ``ibm_little_endian``, ``ieee754``, and ``ieee754_little_endian``) + * Default: ``ibm`` * Importance: medium -``file_reader.json.compression.type`` - Compression type to use when reading a file. +``file_reader.cobol.reader.schema_policy`` + Specifies a policy to transform the input schema. - * Type: enum (available values ``bzip2``, ``gzip`` and ``none``) - * Default: ``none`` + * Type: enum (available values ``keep_original`` and ``collapse_root``) + * Default: ``keep_original`` * Importance: medium -``file_reader.json.compression.concatenated`` - Flag to specify if the decompression of the reader will finish at the end of the file or after - the first compressed stream. +``file_reader.cobol.reader.string_trimming_policy`` + The trim to apply for records with string data types. + + * Type: enum (available values ``both``, ``left``, ``right`` and ``none``) + * Default: ``both`` + * Importance: medium + +``file_reader.cobol.reader.start_offset`` + An offset to the start of the record in each binary data block. + + * Type: int + * Default: ``0`` + * Importance: medium + +``file_reader.cobol.reader.end_offset`` + An offset from the end of the record to the end of the binary data block. + + * Type: int + * Default: ``0`` + * Importance: medium + +``file_reader.cobol.reader.file_start_offset`` + A number of bytes to skip at the beginning of each file. + + * Type: int + * Default: ``0`` + * Importance: medium + +``file_reader.cobol.reader.file_end_offset`` + A number of bytes to skip at the end of each file. + + * Type: int + * Default: ``0`` + * Importance: medium + +``file_reader.cobol.reader.ebcdic_code_page_class`` + Custom code page conversion class provided. + + * Type: string + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.ascii_charset`` + Charset for ASCII data. + + * Type: string + * Default: ```` + * Importance: low + +``file_reader.cobol.reader.is_uft16_big_endian`` + Flag to consider UTF-16 strings as big-endian. + + * Type: boolean + * Default: ``true`` + * Importance: low + +``file_reader.cobol.reader.variable_size_occurs`` + If true, occurs depending on data size will depend on the number of elements. + + * Type: boolean + * Default: ``false`` + * Importance: low + +``file_reader.cobol.reader.length_field_name`` + The name for a field that contains the record length. If not set, the copybook record length will be used. + + * Type: string + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.is_rdw_big_endian`` + If the RDW is big endian. + + * Type: boolean + * Default: ``false`` + * Importance: low + +``file_reader.cobol.reader.is_rdw_part_rec_length`` + If the RDW count itself as part of record length itself. + + * Type: boolean + * Default: ``false`` + * Importance: low + +``file_reader.cobol.reader.rdw_adjustment`` + Controls a mismatch between RDW and record length. + + * Type: int + * Default: ``0`` + * Importance: low + +``file_reader.cobol.reader.is_index_generation_needed`` + If the indexing input file before processing is requested. + + * Type: boolean + * Default: ``false`` + * Importance: low + +``file_reader.cobol.reader.input_split_records`` + The number of records to include in each partition. + + * Type: int + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.input_split_size_mb`` + A partition size to target. + + * Type: int + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.hdfs_default_block_size`` + Default HDFS block size for the HDFS filesystem used. + + * Type: int + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.drop_group_fillers`` + If true the parser will drop all FILLER fields, even GROUP FILLERS that have non-FILLER nested fields. + + * Type: boolean + * Default: ``false`` + * Importance: low + +``file_reader.cobol.reader.drop_value_fillers`` + If true the parser will drop all value FILLER fields. * Type: boolean * Default: ``true`` * Importance: low +``file_reader.cobol.reader.non_terminals`` + A comma-separated list of group-type fields to combine and parse as primitive fields. + + * Type: string[] + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.debug_fields_policy`` + Specifies if debugging fields need to be added and what should they contain. + + * Type: enum (available values ``hex``, ``raw`` and ``none``) + * Default: ``none`` + * Importance: low + +``file_reader.cobol.reader.record_header_parser`` + Parser to be used to parse data field record headers. + + * Type: string + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.rhp_additional_info`` + Extra option to be passed to a custom record header parser. + + * Type: string + * Default: ``null`` + * Importance: low + +``file_reader.cobol.reader.input_file_name_column`` + A column name to add to each record containing the input file name. + + * Type: string + * Default: ```` + * Importance: low + .. _config_options-filereaders-csv: CSV @@ -846,6 +1048,130 @@ To configure custom properties for this reader, the name you must use is ``delim * Default: ``true`` * Importance: low +.. _config_options-filereaders-json: + +JSON +-------------------------------------------- + +To configure custom properties for this reader, the name you must use is ``json``. + +``file_reader.json.record_per_line`` + If enabled, the reader will read each line as a record. Otherwise, the reader will read the full + content of the file as a record. + + * Type: boolean + * Default: ``true`` + * Importance: medium + +``file_reader.json.deserialization.`` + Deserialization feature to use when reading a JSON file. You can add as much as you like + based on the ones defined `here. `__ + + * Type: boolean + * Importance: medium + +``file_reader.json.encoding`` + Encoding to use for reading a file. If not specified, the reader will use the default encoding. + + * Type: string + * Default: based on the locale and charset of the underlying operating system. + * Importance: medium + +``file_reader.json.compression.type`` + Compression type to use when reading a file. + + * Type: enum (available values ``bzip2``, ``gzip`` and ``none``) + * Default: ``none`` + * Importance: medium + +``file_reader.json.compression.concatenated`` + Flag to specify if the decompression of the reader will finish at the end of the file or after + the first compressed stream. + + * Type: boolean + * Default: ``true`` + * Importance: low + +.. _config_options-filereaders-xml: + +XML +-------------------------------------------- + +To configure custom properties for this reader, the name you must use is ``xml``. + +``file_reader.xml.record_per_line`` + If enabled, the reader will read each line as a record. Otherwise, the reader will read the full + content of the file as a record. + + * Type: boolean + * Default: ``true`` + * Importance: medium + +``file_reader.xml.deserialization.`` + Deserialization feature to use when reading a XML file. You can add as much as you like + based on the ones defined `here. `__ + + * Type: boolean + * Importance: medium + +``file_reader.xml.encoding`` + Encoding to use for reading a file. If not specified, the reader will use the default encoding. + + * Type: string + * Default: based on the locale and charset of the underlying operating system. + * Importance: medium + +``file_reader.xml.compression.type`` + Compression type to use when reading a file. + + * Type: enum (available values ``bzip2``, ``gzip`` and ``none``) + * Default: ``none`` + * Importance: medium + +``file_reader.xml.compression.concatenated`` + Flag to specify if the decompression of the reader will finish at the end of the file or after + the first compressed stream. + + * Type: boolean + * Default: ``true`` + * Importance: low + +.. _config_options-filereaders-yaml: + +YAML +-------------------------------------------- + +To configure custom properties for this reader, the name you must use is ``yaml``. + +``file_reader.yaml.deserialization.`` + Deserialization feature to use when reading a YAML file. You can add as much as you like + based on the ones defined `here. `__ + + * Type: boolean + * Importance: medium + +``file_reader.yaml.encoding`` + Encoding to use for reading a file. If not specified, the reader will use the default encoding. + + * Type: string + * Default: based on the locale and charset of the underlying operating system. + * Importance: medium + +``file_reader.yaml.compression.type`` + Compression type to use when reading a file. + + * Type: enum (available values ``bzip2``, ``gzip`` and ``none``) + * Default: ``none`` + * Importance: medium + +``file_reader.yaml.compression.concatenated`` + Flag to specify if the decompression of the reader will finish at the end of the file or after + the first compressed stream. + + * Type: boolean + * Default: ``true`` + * Importance: low + .. _config_options-filereaders-text: Text @@ -900,59 +1226,80 @@ To configure custom properties for this reader, the name you must use is ``agnos ``file_reader.agnostic.extensions.parquet`` A comma-separated string list with the accepted extensions for Parquet files. - * Type: string + * Type: string[] * Default: ``parquet`` * Importance: medium ``file_reader.agnostic.extensions.avro`` A comma-separated string list with the accepted extensions for Avro files. - * Type: string + * Type: string[] * Default: ``avro`` * Importance: medium ``file_reader.agnostic.extensions.orc`` A comma-separated string list with the accepted extensions for ORC files. - * Type: string + * Type: string[] * Default: ``orc`` * Importance: medium ``file_reader.agnostic.extensions.sequence`` A comma-separated string list with the accepted extensions for Sequence files. - * Type: string + * Type: string[] * Default: ``seq`` * Importance: medium -``file_reader.agnostic.extensions.json`` - A comma-separated string list with the accepted extensions for JSON files. +``file_reader.agnostic.extensions.cobol`` + A comma-separated string list with the accepted extensions for Cobol files. - * Type: string - * Default: ``json`` + * Type: string[] + * Default: ``dat`` * Importance: medium ``file_reader.agnostic.extensions.csv`` A comma-separated string list with the accepted extensions for CSV files. - * Type: string + * Type: string[] * Default: ``csv`` * Importance: medium ``file_reader.agnostic.extensions.tsv`` A comma-separated string list with the accepted extensions for TSV files. - * Type: string + * Type: string[] * Default: ``tsv`` * Importance: medium ``file_reader.agnostic.extensions.fixed`` A comma-separated string list with the accepted extensions for fixed-width files. - * Type: string + * Type: string[] * Default: ``fixed`` * Importance: medium +``file_reader.agnostic.extensions.json`` + A comma-separated string list with the accepted extensions for JSON files. + + * Type: string[] + * Default: ``json`` + * Importance: medium + +``file_reader.agnostic.extensions.xml`` + A comma-separated string list with the accepted extensions for XML files. + + * Type: string[] + * Default: ``xml`` + * Importance: medium + +``file_reader.agnostic.extensions.yaml`` + A comma-separated string list with the accepted extensions for YAML files. + + * Type: string[] + * Default: ``yaml`` + * Importance: medium + .. note:: The Agnostic reader uses the previous ones as inner readers. So, in case of using this reader, you'll probably need to include also the specified properties for those readers in the connector configuration as well. diff --git a/docs/source/connector.rst b/docs/source/connector.rst index 948dcb7..f5e7891 100644 --- a/docs/source/connector.rst +++ b/docs/source/connector.rst @@ -4,7 +4,7 @@ Connector ******************************************** -The connector takes advantage of the abstraction provided from `Hadoop Common `__ +The connector takes advantage of the abstraction provided from `Hadoop Common `__ using the implementation of the ``org.apache.hadoop.fs.FileSystem`` class. So, it's possible to use a wide variety of FS or if your FS is not included in the Hadoop Common API you can implement an extension of this abstraction and using it in a transparent way. @@ -26,8 +26,8 @@ Getting started Prerequisites -------------------------------------------- -- Apache Kafka 2.5.0 -- Java 8 +- Apache Kafka 2.6.0. +- Java 8. - Confluent Schema Registry (recommended). Building from source @@ -53,6 +53,7 @@ The ``kafka-connect-fs.properties`` file defines the following properties as req policy.recursive=true policy.regexp=.* policy.batch_size=0 + policy.cleanup=none file_reader.class= file_reader.batch_size=0 @@ -68,6 +69,7 @@ The ``kafka-connect-fs.properties`` file defines the following properties as req #. Flag to activate traversed recursion in subdirectories when listing files. #. Regular expression to filter files from the FS. #. Number of files that should be handled at a time. Non-positive values disable batching. +#. Cleanup strategy to manage processed files. #. File reader class to read files from the FS (must implement ``com.github.mmolimar.kafka.connect.fs.file.reader.FileReader`` interface). #. Number of records to process at a time. Non-positive values disable batching. @@ -116,11 +118,11 @@ Policies In order to ingest data from the FS(s), the connector needs a **policy** to define the rules to do it. -Basically, the policy tries to connect to each FS included in ``fs.uris`` connector property, lists files +Basically, the policy tries to connect to each FS included in the ``fs.uris`` connector property, lists files (and filter them using the regular expression provided in the ``policy.regexp`` property) and enables -a file reader to read records from them. +a file reader to read records. -The policy to be used by the connector is defined in ``policy.class`` connector property. +The policy to be used by the connector is defined in the ``policy.class`` connector property. .. important:: When delivering records from the connector to Kafka, they contain their own file offset so, if in the next eventual policy execution this file is processed again, @@ -142,13 +144,28 @@ File readers They read files and process each record from the FS. The **file reader** is needed by the policy to enable the connector to process each record and includes in the implementation how to seek and iterate over the -records in the file. +records within the file. -The file reader to be used when processing files is defined in ``file_reader.class`` connector property. +The file reader to be used when processing files is defined in the ``file_reader.class`` connector property. -In the same way as the policies, the connector provides several sort of readers to parse and read records +In the same way as policies, the connector provides several sort of readers to parse and read records for different file formats. If you don't have a file reader that fits your needs, just implement one with the unique restriction that it must implement the interface ``com.github.mmolimar.kafka.connect.fs.file.reader.FileReader``. +The are several file readers included which can read the following file formats: + +* Parquet. +* Avro. +* ORC. +* SequenceFile. +* Cobol / EBCDIC. +* CSV. +* TSV. +* Fixed-width. +* JSON. +* XML. +* YAML. +* Text. + .. include:: filereaders.rst diff --git a/docs/source/filereaders.rst b/docs/source/filereaders.rst index d38a5e9..d1297a2 100644 --- a/docs/source/filereaders.rst +++ b/docs/source/filereaders.rst @@ -1,15 +1,3 @@ -Avro -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Files with `Avro `__ format can be read with this reader. - -The Avro schema is not needed due to is read from the file. The message sent -to Kafka is created by transforming the record by means of -`Confluent avro-converter `__ -API. - -More information about properties of this file reader :ref:`here`. - Parquet ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -19,13 +7,20 @@ The reader takes advantage of the Parquet-Avro API and uses the Parquet file as if it was an Avro file, so the message sent to Kafka is built in the same way as the Avro file reader does. -.. warning:: Seeking Parquet files is a heavy task because the reader has to - iterate over all records. If the policy processes the same file - over and over again and has to seek the file, the performance - can be affected. - More information about properties of this file reader :ref:`here`. +Avro +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Files with `Avro `__ format can be read with this reader. + +The Avro schema is not needed due to is read from the file. The message sent +to Kafka is created by transforming the record by means of +`Confluent avro-converter `__ +API. + +More information about properties of this file reader :ref:`here`. + ORC ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -46,27 +41,29 @@ SequenceFile ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `Sequence files `__ are one kind of -the Hadoop file formats which are serialized in key/value pairs. +the Hadoop file formats which are serialized in key-value pairs. This reader can process this file format and build a Kafka message with the -key/value pair. These two values are named ``key`` and ``value`` in the message +key-value pair. These two values are named ``key`` and ``value`` in the message by default but you can customize these field names. More information about properties of this file reader :ref:`here`. -JSON +Cobol ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Reads JSON files which might contain multiple number of fields with their specified -data types. The schema for this sort of records is inferred reading the first record -and marked as optional in the schema all the fields contained. +Mainframe files (Cobol / EBCDIC binary files) can be processed with this reader which uses the +`Cobrix `__ parser. -More information about properties of this file reader :ref:`here`. +By means of the corresponding copybook -representing its schema-, it parses each record and +translate it into a Kafka message with the schema. + +More information about properties of this file reader :ref:`here`. CSV ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -CSV file reader using a custom token to distinguish different columns on each line. +CSV file reader using a custom token to distinguish different columns in each line. It allows to distinguish a header in the files and set the name of their columns in the message sent to Kafka. If there is no header, the value of each column will be in @@ -80,7 +77,7 @@ More information about properties of this file reader :ref:`here`__. More information about properties of this file reader :ref:`here`. +JSON +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Reads JSON files which might contain multiple number of fields with their specified +data types. The schema for this sort of records is inferred reading the first record +and marked as optional in the schema all the fields contained. + +More information about properties of this file reader :ref:`here`. + +XML +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Reads XML files which might contain multiple number of fields with their specified +data types. The schema for this sort of records is inferred reading the first record +and marked as optional in the schema all the fields contained. + +.. warning:: Take into account the current + `limitations `__. + +More information about properties of this file reader :ref:`here`. + +YAML +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Reads YAML files which might contain multiple number of fields with their specified +data types. The schema for this sort of records is inferred reading the first record +and marked as optional in the schema all the fields contained. + +More information about properties of this file reader :ref:`here`. + Text ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -116,8 +143,8 @@ Agnostic Actually, this reader is a wrapper of the readers listing above. It tries to read any kind of file format using an internal reader based on the file extension, -applying the proper one (Parquet, Avro, ORC, SequenceFile, CSV, TSV or Text). In case of no -extension has been matched, the Text file reader will be applied. +applying the proper one (Parquet, Avro, ORC, SequenceFile, Cobol / EBCDIC, CSV, TSV, FixedWidth, JSON, XML, +YAML, or Text). In case of no extension has been matched, the Text file reader will be applied. Default extensions for each format (configurable): @@ -125,10 +152,13 @@ Default extensions for each format (configurable): * Avro: ``.avro`` * ORC: ``.orc`` * SequenceFile: ``.seq`` -* JSON: ``.json`` +* Cobol / EBCDIC: ``.dat`` * CSV: ``.csv`` * TSV: ``.tsv`` * FixedWidth: ``.fixed`` +* JSON: ``.json`` +* XML: ``.xml`` +* YAML: ``.yaml`` * Text: any other sort of file extension. More information about properties of this file reader :ref:`here`. diff --git a/docs/source/index.rst b/docs/source/index.rst index cd5bdb9..2ed199c 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -11,7 +11,7 @@ The connector supports: * Several sort of File Systems (FS) to use. * Dynamic and static URIs to ingest data from. -* Policies to define rules about how to look for files. +* Policies to define rules about how to look for files and clean them up after processing. * File readers to parse and read different kind of file formats. To learn more about the connector you can read :ref:`this section` and for more detailed diff --git a/docs/source/policies.rst b/docs/source/policies.rst index 1a5f654..3225de6 100644 --- a/docs/source/policies.rst +++ b/docs/source/policies.rst @@ -17,7 +17,7 @@ Cron ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ This policy is scheduled based on cron expressions and their format to put in the configuration -are based on the library `Quartz Scheduler `__ +are based on the library `Quartz Scheduler `__. After finishing each execution, the policy gets slept until the next one is scheduled, if applicable. diff --git a/pom.xml b/pom.xml index 8d1c2dc..0afe9f9 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.github.mmolimar.kafka.connect kafka-connect-fs - 1.1.0 + 1.2.0 jar kafka-connect-fs @@ -46,22 +46,26 @@ UTF-8 - 2.5.0 - 5.5.0 - 3.2.1 - hadoop3-2.1.3 - 1.11.0 + 2.6.0 + 5.5.1 + 3.3.0 + hadoop3-2.1.5 + 1.11.1 1.6.3 - 2.8.4 - 9.0.2 - 0.1.54 - 5.6.2 + 2.9.0 + 2.10.2 + 2.1.1 + 2.12.12 + 9.1.1 + 0.1.55 + 5.7.0 4.2 2.0.7 1.8 ${maven-compiler.source} 3.2.0 3.8.1 + 4.4.0 3.3.0 0.8.5 4.3.0 @@ -121,6 +125,21 @@ univocity-parsers ${univocity.version} + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + ${jackson-dataformat.version} + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + ${jackson-dataformat.version} + + + za.co.absa.cobrix + cobol-parser_2.12 + ${cobrix.version} + com.cronutils cron-utils @@ -132,7 +151,7 @@ ${jsch.version} - + org.junit.jupiter junit-jupiter @@ -161,6 +180,24 @@ + + net.alchim31.maven + scala-maven-plugin + ${maven-scala-plugin.version} + + ${scala.version} + + + + scala-compile-first + process-resources + + add-source + compile + + + + org.apache.maven.plugins maven-jar-plugin @@ -183,6 +220,14 @@ ${maven-compiler.source} ${maven-compiler.target} + + + compile + + compile + + + org.apache.maven.plugins @@ -226,6 +271,12 @@ org.eluder.coveralls coveralls-maven-plugin ${maven-coveralls-plugin.version} + + + ${basedir}/src/main/java + ${basedir}/src/main/scala + + io.confluent @@ -241,11 +292,18 @@ Kafka Connect FileSystem https://kafka-connect-fs.readthedocs.io https://github.com/mmolimar/kafka-connect-fs - + + + The following file types are supported: Parquet, Avro, ORC, SequenceFile, + Cobol / EBCDIC, CSV, TSV, Fixed-width, JSON, XML, YAML and Text. + + Also, the connector has built-in support for file systems such as HDFS, S3, + Google Cloud Storage, Azure Blob Storage, Azure Data Lake Store, FTP, SFTP and + local file system, among others. + ]]> https://github.com/mmolimar/kafka-connect-fs Mario Molina @@ -277,14 +335,18 @@ azure sftp ftp - txt - csv - tsv - json - avro parquet + avro orc sequence + cobol + csv + tsv + fixed + json + xml + yaml + txt diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java index bb74d5e..979695a 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java @@ -89,7 +89,7 @@ public List poll() { Map partitionKey = makePartitionKey.apply(metadata); Map offset = Optional.ofNullable(offsets.get(partitionKey)).orElse(new HashMap<>()); try (FileReader reader = policy.offer(metadata, offset)) { - log.info("{} Processing records for file {}...", this, metadata); + if (reader.hasNext()) log.info("{} Processing records for file {}...", this, metadata); while (reader.hasNext()) { Struct record = reader.next(); // TODO change FileReader interface in the next major version @@ -97,7 +97,7 @@ public List poll() { ((AbstractFileReader) reader).hasNextBatch() || reader.hasNext() : reader.hasNext(); records.add(convert(metadata, reader.currentOffset(), !hasNext, record)); } - } catch (ConnectException | IOException e) { + } catch (IOException | ConnectException e) { // when an exception happens reading a file, the connector continues log.warn("{} Error reading file [{}]: {}. Keep going...", this, metadata.getPath(), e.getMessage(), e); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java index f3b56ed..b5c434b 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTaskConfig.java @@ -25,6 +25,18 @@ public class FsSourceTaskConfig extends FsSourceConnectorConfig { private static final String POLICY_BATCH_SIZE_DOC = "Number of files to process at a time. Non-positive values disable batching."; private static final String POLICY_BATCH_SIZE_DISPLAY = "Files per batch"; + public static final String POLICY_CLEANUP = POLICY_PREFIX + "cleanup"; + private static final String POLICY_CLEANUP_DOC = "Cleanup strategy to use when skipping files."; + private static final String POLICY_CLEANUP_DISPLAY = "Cleanup strategy"; + + public static final String POLICY_CLEANUP_MOVE_DIR = POLICY_CLEANUP + ".move"; + private static final String POLICY_CLEANUP_MOVE_DIR_DOC = "Target directory to move files for MOVE cleanup strategy."; + private static final String POLICY_CLEANUP_MOVE_DIR_DISPLAY = "Target directory"; + + public static final String POLICY_CLEANUP_MOVE_DIR_PREFIX = POLICY_CLEANUP_MOVE_DIR + ".prefix"; + private static final String POLICY_CLEANUP_MOVE_DIR_PREFIX_DOC = "Prefix to set to moved files."; + private static final String POLICY_CLEANUP_MOVE_DIR_PREFIX_DISPLAY = "File prefix"; + public static final String POLICY_PREFIX_FS = POLICY_PREFIX + "fs."; public static final String FILE_READER_CLASS = FILE_READER_PREFIX + "class"; @@ -95,6 +107,36 @@ public static ConfigDef conf() { ++order, ConfigDef.Width.MEDIUM, POLICY_BATCH_SIZE_DISPLAY + ).define( + POLICY_CLEANUP, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + POLICY_CLEANUP_DOC, + POLICY_GROUP, + ++order, + ConfigDef.Width.MEDIUM, + POLICY_CLEANUP_DISPLAY + ).define( + POLICY_CLEANUP_MOVE_DIR, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + POLICY_CLEANUP_MOVE_DIR_DOC, + POLICY_GROUP, + ++order, + ConfigDef.Width.MEDIUM, + POLICY_CLEANUP_MOVE_DIR_DISPLAY + ).define( + POLICY_CLEANUP_MOVE_DIR_PREFIX, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.LOW, + POLICY_CLEANUP_MOVE_DIR_PREFIX_DOC, + POLICY_GROUP, + ++order, + ConfigDef.Width.MEDIUM, + POLICY_CLEANUP_MOVE_DIR_PREFIX_DISPLAY ).define( FILE_READER_CLASS, ConfigDef.Type.CLASS, diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java index 2d0dbe5..8761a9d 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReader.java @@ -23,15 +23,18 @@ public class AgnosticFileReader extends AbstractFileReader reader; - private Set parquetExtensions, avroExtensions, sequenceExtensions, orcExtensions, - jsonExtensions, csvExtensions, tsvExtensions, fixedExtensions; + private Set parquetExtensions, avroExtensions, sequenceExtensions, orcExtensions, cobolExtensions, + csvExtensions, tsvExtensions, fixedExtensions, jsonExtensions, xmlExtensions, yamlExtensions; public AgnosticFileReader(FileSystem fs, Path filePath, Map config) throws Exception { super(fs, filePath, new AgnosticAdapter(), config); @@ -57,14 +60,20 @@ private AbstractFileReader readerByExtension(FileSystem fs, Path filePat clz = SequenceFileReader.class; } else if (orcExtensions.contains(extension)) { clz = OrcFileReader.class; - } else if (jsonExtensions.contains(extension)) { - clz = JsonFileReader.class; + } else if (cobolExtensions.contains(extension)) { + clz = CobolFileReader.class; } else if (csvExtensions.contains(extension)) { clz = CsvFileReader.class; } else if (tsvExtensions.contains(extension)) { clz = TsvFileReader.class; } else if (fixedExtensions.contains(extension)) { clz = FixedWidthFileReader.class; + } else if (jsonExtensions.contains(extension)) { + clz = JsonFileReader.class; + } else if (xmlExtensions.contains(extension)) { + clz = XmlFileReader.class; + } else if (yamlExtensions.contains(extension)) { + clz = YamlFileReader.class; } else { clz = TextFileReader.class; } @@ -82,7 +91,7 @@ protected void configure(Map config) { .toLowerCase().split(",")).collect(Collectors.toSet()); this.orcExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_ORC, "orc") .toLowerCase().split(",")).collect(Collectors.toSet()); - this.jsonExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_JSON, "json") + this.cobolExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_COBOL, "dat") .toLowerCase().split(",")).collect(Collectors.toSet()); this.csvExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_CSV, "csv") .toLowerCase().split(",")).collect(Collectors.toSet()); @@ -90,6 +99,12 @@ protected void configure(Map config) { .toLowerCase().split(",")).collect(Collectors.toSet()); this.fixedExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_FIXED, "fixed") .toLowerCase().split(",")).collect(Collectors.toSet()); + this.jsonExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_JSON, "json") + .toLowerCase().split(",")).collect(Collectors.toSet()); + this.xmlExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_XML, "xml") + .toLowerCase().split(",")).collect(Collectors.toSet()); + this.yamlExtensions = Arrays.stream(config.getOrDefault(FILE_READER_AGNOSTIC_EXTENSIONS_YAML, "yaml") + .toLowerCase().split(",")).collect(Collectors.toSet()); } @Override diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReader.java new file mode 100644 index 0000000..0c835df --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReader.java @@ -0,0 +1,393 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import scala.collection.Seq; +import scala.math.BigDecimal; +import scala.math.ScalaNumber; +import za.co.absa.cobrix.cobol.parser.ast.Group; +import za.co.absa.cobrix.cobol.parser.ast.Primitive; +import za.co.absa.cobrix.cobol.parser.ast.Statement; +import za.co.absa.cobrix.cobol.parser.ast.datatype.AlphaNumeric; +import za.co.absa.cobrix.cobol.parser.ast.datatype.COMP1; +import za.co.absa.cobrix.cobol.parser.ast.datatype.Decimal; +import za.co.absa.cobrix.cobol.parser.ast.datatype.Integral; +import za.co.absa.cobrix.cobol.parser.common.Constants; +import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat$; +import za.co.absa.cobrix.cobol.parser.encoding.RAW$; +import za.co.absa.cobrix.cobol.parser.policies.CommentPolicy; +import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy$; +import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy$; +import za.co.absa.cobrix.cobol.reader.VarLenReader; +import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler; +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters; +import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy$; +import za.co.absa.cobrix.cobol.reader.schema.CobolSchema; +import za.co.absa.cobrix.cobol.reader.stream.SimpleStream; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX; +import static scala.collection.JavaConverters.*; + +public class CobolFileReader extends AbstractFileReader { + + private static final String FILE_READER_COBOL = FILE_READER_PREFIX + "cobol."; + private static final String FILE_READER_COBOL_READER = FILE_READER_COBOL + "reader."; + private static final String FILE_READER_COBOL_COPYBOOK_PREFIX = FILE_READER_COBOL + "copybook."; + + public static final String FILE_READER_COBOL_COPYBOOK_CONTENT = FILE_READER_COBOL_COPYBOOK_PREFIX + "content"; + public static final String FILE_READER_COBOL_COPYBOOK_PATH = FILE_READER_COBOL_COPYBOOK_PREFIX + "path"; + + public static final String FILE_READER_COBOL_READER_IS_EBCDIC = FILE_READER_COBOL_READER + "is_ebcdic"; + public static final String FILE_READER_COBOL_READER_EBCDIC_CODE_PAGE = FILE_READER_COBOL_READER + "ebcdic_code_page"; + public static final String FILE_READER_COBOL_READER_EBCDIC_CODE_PAGE_CLASS = FILE_READER_COBOL_READER + "ebcdic_code_page_class"; + public static final String FILE_READER_COBOL_READER_ASCII_CHARSET = FILE_READER_COBOL_READER + "ascii_charset"; + public static final String FILE_READER_COBOL_READER_IS_UFT16_BIG_ENDIAN = FILE_READER_COBOL_READER + "is_uft16_big_endian"; + public static final String FILE_READER_COBOL_READER_FLOATING_POINT_FORMAT = FILE_READER_COBOL_READER + "floating_point_format"; + public static final String FILE_READER_COBOL_READER_VARIABLE_SIZE_OCCURS = FILE_READER_COBOL_READER + "variable_size_occurs"; + public static final String FILE_READER_COBOL_READER_LENGTH_FIELD_NAME = FILE_READER_COBOL_READER + "length_field_name"; + public static final String FILE_READER_COBOL_READER_IS_RECORD_SEQUENCE = FILE_READER_COBOL_READER + "is_record_sequence"; + public static final String FILE_READER_COBOL_READER_IS_RDW_BIG_ENDIAN = FILE_READER_COBOL_READER + "is_rdw_big_endian"; + public static final String FILE_READER_COBOL_READER_IS_RDW_PART_REC_LENGTH = FILE_READER_COBOL_READER + "is_rdw_part_rec_length"; + public static final String FILE_READER_COBOL_READER_RDW_ADJUSTMENT = FILE_READER_COBOL_READER + "rdw_adjustment"; + public static final String FILE_READER_COBOL_READER_IS_INDEX_GENERATION_NEEDED = FILE_READER_COBOL_READER + "is_index_generation_needed"; + public static final String FILE_READER_COBOL_READER_INPUT_SPLIT_RECORDS = FILE_READER_COBOL_READER + "input_split_records"; + public static final String FILE_READER_COBOL_READER_INPUT_SPLIT_SIZE_MB = FILE_READER_COBOL_READER + "input_split_size_mb"; + public static final String FILE_READER_COBOL_READER_HDFS_DEFAULT_BLOCK_SIZE = FILE_READER_COBOL_READER + "hdfs_default_block_size"; + public static final String FILE_READER_COBOL_READER_START_OFFSET = FILE_READER_COBOL_READER + "start_offset"; + public static final String FILE_READER_COBOL_READER_END_OFFSET = FILE_READER_COBOL_READER + "end_offset"; + public static final String FILE_READER_COBOL_READER_FILE_START_OFFSET = FILE_READER_COBOL_READER + "file_start_offset"; + public static final String FILE_READER_COBOL_READER_FILE_END_OFFSET = FILE_READER_COBOL_READER + "file_end_offset"; + public static final String FILE_READER_COBOL_READER_SCHEMA_POLICY = FILE_READER_COBOL_READER + "schema_policy"; + public static final String FILE_READER_COBOL_READER_STRING_TRIMMING_POLICY = FILE_READER_COBOL_READER + "string_trimming_policy"; + public static final String FILE_READER_COBOL_READER_DROP_GROUP_FILLERS = FILE_READER_COBOL_READER + "drop_group_fillers"; + public static final String FILE_READER_COBOL_READER_DROP_VALUE_FILLERS = FILE_READER_COBOL_READER + "drop_value_fillers"; + public static final String FILE_READER_COBOL_READER_NON_TERMINALS = FILE_READER_COBOL_READER + "non_terminals"; + public static final String FILE_READER_COBOL_READER_DEBUG_FIELDS_POLICY = FILE_READER_COBOL_READER + "debug_fields_policy"; + public static final String FILE_READER_COBOL_READER_RECORD_HEADER_PARSER = FILE_READER_COBOL_READER + "record_header_parser"; + public static final String FILE_READER_COBOL_READER_RHP_ADDITIONAL_INFO = FILE_READER_COBOL_READER + "rhp_additional_info"; + public static final String FILE_READER_COBOL_READER_INPUT_FILE_NAME_COLUMN = FILE_READER_COBOL_READER + "input_file_name_column"; + + + private final Schema schema; + private final VarLenReader reader; + private SimpleStream stream; + private String copybook; + private Iterator> iterator; + private ReaderParameters params; + private boolean closed; + + public CobolFileReader(FileSystem fs, Path filePath, Map config) throws Exception { + super(fs, filePath, new CobolToStruct(), config); + + this.reader = CobrixReader.varLenReader(copybook, params); + this.schema = extractSchema(reader.getCobolSchema()); + this.iterator = initIterator(); + this.closed = false; + } + + private Iterator> initIterator() throws Exception { + if (stream != null) { + stream.close(); + } + stream = new FSStream(getFs(), getFilePath()); + return asJavaIterator(reader.getRecordIterator(stream, 0, 0, 0).map(it -> seqAsJavaList(it.seq()))); + } + + private Schema extractSchema(CobolSchema cobolSchema) { + SchemaBuilder builder = SchemaBuilder.struct(); + Group group; + if (params.schemaPolicy().id() == SchemaRetentionPolicy$.MODULE$.CollapseRoot().id()) { + group = (Group) cobolSchema.getCobolSchema().ast().children().head(); + } else { + group = cobolSchema.getCobolSchema().ast(); + } + seqAsJavaList(group.children()) + .forEach(child -> builder.field(child.name(), schemaForField(child))); + + return builder.build(); + } + + private Schema schemaForField(Statement statement) { + if (statement instanceof Group) { + Group group = (Group) statement; + SchemaBuilder childrenBuilder = SchemaBuilder.struct(); + seqAsJavaList(group.children()).forEach(child -> childrenBuilder.field(child.name(), schemaForField(child))); + SchemaBuilder builder; + if (group.isArray()) { + builder = SchemaBuilder.array(childrenBuilder.build()); + } else { + builder = childrenBuilder; + } + return builder.build(); + } + Primitive primitive = (Primitive) statement; + if (primitive.dataType() instanceof Integral) { + Integral dt = (Integral) primitive.dataType(); + if (dt.precision() > Constants.maxLongPrecision()) { + return Schema.OPTIONAL_FLOAT64_SCHEMA; + } else if (dt.precision() > Constants.maxIntegerPrecision()) { + return Schema.OPTIONAL_INT64_SCHEMA; + } else { + return Schema.OPTIONAL_INT32_SCHEMA; + } + } else if (primitive.dataType() instanceof Decimal) { + Decimal dt = (Decimal) primitive.dataType(); + if (dt.compact().exists(c -> c instanceof COMP1)) { + return Schema.OPTIONAL_FLOAT32_SCHEMA; + } + return Schema.OPTIONAL_FLOAT64_SCHEMA; + } else { + AlphaNumeric dt = (AlphaNumeric) primitive.dataType(); + if (dt.enc().exists(enc -> enc instanceof RAW$)) { + return Schema.OPTIONAL_BYTES_SCHEMA; + } + return Schema.OPTIONAL_STRING_SCHEMA; + } + } + + @Override + protected void configure(Map config) { + copybook = copybookContent(config); + params = getReaderParameters(config); + } + + private String copybookContent(Map config) { + String content = Optional.ofNullable(config.get(FILE_READER_COBOL_COPYBOOK_PATH)) + .map(Path::new) + .map(path -> { + StringBuilder sb = new StringBuilder(); + try (InputStream is = getFs().open(path); + BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + br.lines().forEach(line -> sb.append(line).append("\n")); + } catch (IOException ioe) { + throw new ConnectException("Cannot read Copybook file: " + path, ioe); + } + return sb.toString(); + }) + .orElseGet(() -> config.get(FILE_READER_COBOL_COPYBOOK_CONTENT)); + + if (content == null || content.trim().isEmpty()) { + throw new ConnectException("Copybook is not specified."); + } + return content; + } + + private ReaderParameters getReaderParameters(Map config) { + return new ReaderParameters( + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_EBCDIC, "true")), // isEbcdic + config.getOrDefault(FILE_READER_COBOL_READER_EBCDIC_CODE_PAGE, "common"), // ebcdicCodePage + scala.Option.apply(config.get(FILE_READER_COBOL_READER_EBCDIC_CODE_PAGE_CLASS)), // ebcdicCodePageClass + config.getOrDefault(FILE_READER_COBOL_READER_ASCII_CHARSET, ""), // asciiCharset + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_UFT16_BIG_ENDIAN, "true")), // isUtf16BigEndian + FloatingPointFormat$.MODULE$.withNameOpt(config.getOrDefault(FILE_READER_COBOL_READER_FLOATING_POINT_FORMAT, "ibm")).get(), // floatingPointFormat + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_VARIABLE_SIZE_OCCURS, "false")), // variableSizeOccurs + scala.Option.apply(config.get(FILE_READER_COBOL_READER_LENGTH_FIELD_NAME)), // lengthFieldName + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_RECORD_SEQUENCE, "false")), // isRecordSequence + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_RDW_BIG_ENDIAN, "false")), // isRdwBigEndian + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_RDW_PART_REC_LENGTH, "false")), // isRdwPartRecLength + Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_RDW_ADJUSTMENT, "0")), // rdwAdjustment + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_IS_INDEX_GENERATION_NEEDED, "false")), // isIndexGenerationNeeded + scala.Option.apply(config.get(FILE_READER_COBOL_READER_INPUT_SPLIT_RECORDS)), // inputSplitRecords + scala.Option.apply(config.get(FILE_READER_COBOL_READER_INPUT_SPLIT_SIZE_MB)), // inputSplitSizeMB + scala.Option.apply(config.get(FILE_READER_COBOL_READER_HDFS_DEFAULT_BLOCK_SIZE)), // hdfsDefaultBlockSize + Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_START_OFFSET, "0")), // startOffset + Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_END_OFFSET, "0")), // endOffset + Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_FILE_START_OFFSET, "0")), // fileStartOffset + Integer.parseInt(config.getOrDefault(FILE_READER_COBOL_READER_FILE_END_OFFSET, "0")), // fileEndOffset + false, // generateRecordId + SchemaRetentionPolicy$.MODULE$.withNameOpt(config.getOrDefault(FILE_READER_COBOL_READER_SCHEMA_POLICY, "keep_original")).get(), // schemaPolicy + StringTrimmingPolicy$.MODULE$.withNameOpt(config.getOrDefault(FILE_READER_COBOL_READER_STRING_TRIMMING_POLICY, "both")).get(), // stringTrimmingPolicy + scala.Option.apply(null), // multisegment + new CommentPolicy(true, 6, 72), // commentPolicy + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_DROP_GROUP_FILLERS, "false")), // dropGroupFillers + Boolean.parseBoolean(config.getOrDefault(FILE_READER_COBOL_READER_DROP_VALUE_FILLERS, "true")), // dropValueFillers + asScalaBuffer(Arrays.asList(config.getOrDefault(FILE_READER_COBOL_READER_NON_TERMINALS, "").split(","))), // nonTerminals + scala.collection.immutable.Map$.MODULE$.empty(), // occursMappings + DebugFieldsPolicy$.MODULE$.withNameOpt(config.getOrDefault(FILE_READER_COBOL_READER_DEBUG_FIELDS_POLICY, "none")).get(), // debugFieldsPolicy + scala.Option.apply(config.get(FILE_READER_COBOL_READER_RECORD_HEADER_PARSER)), // recordHeaderParser + scala.Option.apply(config.get(FILE_READER_COBOL_READER_RHP_ADDITIONAL_INFO)), // rhpAdditionalInfo + config.getOrDefault(FILE_READER_COBOL_READER_INPUT_FILE_NAME_COLUMN, "") // inputFileNameColumn + ); + } + + @Override + protected boolean hasNextRecord() { + return iterator.hasNext(); + } + + @Override + protected CobolRecord nextRecord() { + incrementOffset(); + return new CobolRecord(schema, iterator.next()); + } + + @Override + protected void seekFile(long offset) { + if (currentOffset() > offset) { + try { + iterator = initIterator(); + } catch (Exception e) { + throw new ConnectException("Error seeking file: " + getFilePath(), e); + } + closed = false; + setOffset(0); + } + while (hasNext() && currentOffset() < offset) { + nextRecord(); + } + } + + @Override + protected boolean isClosed() { + return closed; + } + + @Override + public void close() { + try { + stream.close(); + } catch (Exception e) { + log.warn("{} An error has occurred while closing file stream.", this, e); + } + closed = true; + } + + private static class FSStream implements SimpleStream { + + private final FileSystem fs; + private final Path file; + private final FSDataInputStream stream; + private final long size; + private long offset; + + FSStream(FileSystem fs, Path file) throws IOException { + this.fs = fs; + this.file = file; + this.stream = this.fs.open(file); + this.size = fs.getContentSummary(file).getLength(); + this.offset = stream.getPos(); + } + + @Override + public long size() { + return size; + } + + @Override + public long offset() { + return offset; + } + + @Override + public String inputFileName() { + return file.toString(); + } + + @Override + public byte[] next(int numberOfBytes) throws IOException { + int bytesToRead = (int) Math.min(numberOfBytes, size() - offset()); + byte[] bytes = new byte[bytesToRead]; + stream.readFully(bytes); + offset += bytesToRead; + return bytes; + } + + @Override + public void close() throws IOException { + stream.close(); + } + } + + static class CobolToStruct implements ReaderAdapter { + + public Struct apply(CobolRecord record) { + Struct struct = new Struct(record.schema); + record.row.stream() + .filter(col -> col instanceof Map) + .forEach(col -> { + Map column = (Map) col; + column.forEach((k, v) -> struct.put(k, mapValue(record.schema.field(k).schema(), k, v))); + }); + return struct; + } + + private Object mapValue(Schema schema, String fieldName, Object value) { + if (value == null) { + return null; + } else if (schema.type() == Schema.Type.ARRAY) { + List items = (List) value; + return items.stream() + .map(item -> mapValue(schema.valueSchema(), fieldName, ((Map) item).get(fieldName))) + .collect(Collectors.toList()); + } else if (schema.type() != Schema.Type.STRUCT) { + return value; + } + Struct struct = new Struct(schema); + Map map = (Map) value; + map.forEach((k, v) -> struct.put(k, mapValue(schema.field(k).schema(), k, v))); + return struct; + } + } + + static class CobolRecord { + + final Schema schema; + final List row; + + CobolRecord(Schema schema, List row) { + this.schema = schema; + this.row = row; + } + + } + + static class StructHandler implements RecordHandler> { + + @Override + public Map create(Object[] values, Group group) { + return Collections.singletonMap(group.name(), mapValues(group, values)); + } + + @Override + public Seq toSeq(Map record) { + return asScalaBuffer(new ArrayList<>(record.values())).toSeq(); + } + + private Map mapValues(Group group, Object[] values) { + List statements = seqAsJavaList(group.children().toSeq()); + return IntStream.range(0, values.length) + .mapToObj(index -> new AbstractMap.SimpleEntry<>(statements.get(index), values[index])) + .map(entry -> transform(entry.getKey(), entry.getValue())) + .collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), HashMap::putAll); + } + + private Map.Entry transform(Statement child, Object value) { + Object childValue; + if (child instanceof Group && value instanceof Map) { + childValue = ((Map) value).get(child.name()); + } else if (value instanceof Object[]) { + childValue = Arrays.asList((Object[]) value); + } else if (value instanceof ScalaNumber) { + childValue = value instanceof scala.math.BigDecimal ? + ((BigDecimal) value).doubleValue() : ((ScalaNumber) value).longValue(); + } else { + childValue = value; + } + return new AbstractMap.SimpleEntry<>(child.name(), childValue); + } + + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java index a5fe758..1fecfe1 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReader.java @@ -11,8 +11,6 @@ public interface FileReader extends Iterator, Closeable { Path getFilePath(); - Struct next(); - void seek(long offset); long currentOffset(); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JacksonFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JacksonFileReader.java new file mode 100644 index 0000000..8354efe --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JacksonFileReader.java @@ -0,0 +1,219 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +abstract class JacksonFileReader extends AbstractFileReader { + + private final TextFileReader inner; + private final Schema schema; + private ObjectMapper mapper; + + public JacksonFileReader(FileSystem fs, Path filePath, Map config) throws IOException { + super(fs, filePath, new JacksonToStruct(), config); + + config.put(TextFileReader.FILE_READER_TEXT_ENCODING, readerEncodingConfig(config)); + config.put(TextFileReader.FILE_READER_TEXT_RECORD_PER_LINE, recordPerLineConfig(config)); + config.put(TextFileReader.FILE_READER_TEXT_COMPRESSION_TYPE, compressionTypeConfig(config)); + config.put(TextFileReader.FILE_READER_TEXT_COMPRESSION_CONCATENATED, compressionConcatenatedConfig(config)); + + this.inner = new TextFileReader(fs, filePath, config); + + if (hasNext()) { + String line = inner.nextRecord().getValue(); + this.schema = extractSchema(mapper.readTree(line)); + // back to the first line + inner.seek(0); + } else { + this.schema = SchemaBuilder.struct().build(); + } + } + + protected abstract Object readerEncodingConfig(Map config); + + protected abstract Object recordPerLineConfig(Map config); + + protected abstract Object compressionTypeConfig(Map config); + + protected abstract Object compressionConcatenatedConfig(Map config); + + protected abstract String deserializationConfigPrefix(); + + protected abstract ObjectMapper getObjectMapper(); + + @Override + protected void configure(Map config) { + mapper = getObjectMapper(); + Set deserializationFeatures = Arrays.stream(DeserializationFeature.values()) + .map(Enum::name) + .collect(Collectors.toSet()); + config.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(deserializationConfigPrefix())) + .forEach(entry -> { + String feature = entry.getKey().replaceAll(deserializationConfigPrefix(), ""); + if (deserializationFeatures.contains(feature)) { + mapper.configure(DeserializationFeature.valueOf(feature), + Boolean.parseBoolean(entry.getValue())); + } else { + log.warn("{} Ignoring deserialization configuration [{}] due to it does not exist.", + this, feature); + } + }); + } + + @Override + protected JacksonRecord nextRecord() throws IOException { + JsonNode value = mapper.readTree(inner.nextRecord().getValue()); + return new JacksonRecord(schema, value); + } + + @Override + public boolean hasNextRecord() throws IOException { + return inner.hasNextRecord(); + } + + @Override + public void seekFile(long offset) throws IOException { + inner.seekFile(offset); + } + + @Override + public long currentOffset() { + return inner.currentOffset(); + } + + @Override + public void close() throws IOException { + inner.close(); + } + + @Override + public boolean isClosed() { + return inner.isClosed(); + } + + private static Schema extractSchema(JsonNode jsonNode) { + switch (jsonNode.getNodeType()) { + case BOOLEAN: + return Schema.OPTIONAL_BOOLEAN_SCHEMA; + case NUMBER: + if (jsonNode.isShort()) { + return Schema.OPTIONAL_INT8_SCHEMA; + } else if (jsonNode.isInt()) { + return Schema.OPTIONAL_INT32_SCHEMA; + } else if (jsonNode.isLong()) { + return Schema.OPTIONAL_INT64_SCHEMA; + } else if (jsonNode.isBigInteger()) { + return Schema.OPTIONAL_INT64_SCHEMA; + } else { + return Schema.OPTIONAL_FLOAT64_SCHEMA; + } + case STRING: + return Schema.OPTIONAL_STRING_SCHEMA; + case BINARY: + return Schema.OPTIONAL_BYTES_SCHEMA; + case ARRAY: + Iterable elements = jsonNode::elements; + Schema arraySchema = StreamSupport.stream(elements.spliterator(), false) + .findFirst().map(JacksonFileReader::extractSchema) + .orElse(SchemaBuilder.struct().build()); + return SchemaBuilder.array(arraySchema).build(); + case OBJECT: + SchemaBuilder builder = SchemaBuilder.struct(); + jsonNode.fields() + .forEachRemaining(field -> builder.field(field.getKey(), extractSchema(field.getValue()))); + return builder.build(); + default: + return SchemaBuilder.struct().optional().build(); + } + } + + static class JacksonToStruct implements ReaderAdapter { + + @Override + public Struct apply(JacksonRecord record) { + return toStruct(record.schema, record.value); + } + + private Struct toStruct(Schema schema, JsonNode jsonNode) { + if (jsonNode.isNull()) return null; + Struct struct = new Struct(schema); + jsonNode.fields() + .forEachRemaining(field -> struct.put( + field.getKey(), + mapValue(struct.schema().field(field.getKey()).schema(), field.getValue()) + )); + return struct; + } + + private Object mapValue(Schema schema, JsonNode value) { + if (value == null) return null; + + switch (value.getNodeType()) { + case BOOLEAN: + return value.booleanValue(); + case NUMBER: + if (value.isShort()) { + return value.shortValue(); + } else if (value.isInt()) { + return value.intValue(); + } else if (value.isLong()) { + return value.longValue(); + } else if (value.isBigInteger()) { + return value.bigIntegerValue().longValue(); + } else { + return value.numberValue().doubleValue(); + } + case STRING: + return value.asText(); + case BINARY: + try { + return value.binaryValue(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + case OBJECT: + case POJO: + Struct struct = new Struct(schema); + Iterable> fields = value::fields; + StreamSupport.stream(fields.spliterator(), false) + .forEach(field -> struct.put(field.getKey(), + mapValue(extractSchema(field.getValue()), field.getValue())) + ); + return struct; + case ARRAY: + Iterable arrayElements = value::elements; + return StreamSupport.stream(arrayElements.spliterator(), false) + .map(elm -> mapValue(schema.valueSchema(), elm)) + .collect(Collectors.toList()); + case NULL: + case MISSING: + default: + return null; + } + } + } + + static class JacksonRecord { + private final Schema schema; + private final JsonNode value; + + JacksonRecord(Schema schema, JsonNode value) { + this.schema = schema; + this.value = value; + } + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java index f1440b0..530a1a4 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReader.java @@ -1,219 +1,57 @@ package com.github.mmolimar.kafka.connect.fs.file.reader; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; import java.io.IOException; -import java.util.Arrays; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX; -public class JsonFileReader extends AbstractFileReader { +public class JsonFileReader extends JacksonFileReader { private static final String FILE_READER_JSON = FILE_READER_PREFIX + "json."; private static final String FILE_READER_JSON_COMPRESSION = FILE_READER_JSON + "compression."; - public static final String FILE_READER_JSON_RECORD_PER_LINE = FILE_READER_JSON + "record_per_line"; - public static final String FILE_READER_JSON_DESERIALIZATION_CONFIGS = FILE_READER_JSON + "deserialization."; + static final String FILE_READER_JSON_DESERIALIZATION_CONFIGS = FILE_READER_JSON + "deserialization."; + public static final String FILE_READER_JSON_RECORD_PER_LINE = FILE_READER_JSON + "record_per_line"; public static final String FILE_READER_JSON_COMPRESSION_TYPE = FILE_READER_JSON_COMPRESSION + "type"; public static final String FILE_READER_JSON_COMPRESSION_CONCATENATED = FILE_READER_JSON_COMPRESSION + "concatenated"; public static final String FILE_READER_JSON_ENCODING = FILE_READER_JSON + "encoding"; - private final TextFileReader inner; - private final Schema schema; - private ObjectMapper mapper; - public JsonFileReader(FileSystem fs, Path filePath, Map config) throws IOException { - super(fs, filePath, new JsonToStruct(), config); - - config.put(TextFileReader.FILE_READER_TEXT_ENCODING, config.get(FILE_READER_JSON_ENCODING)); - config.put(TextFileReader.FILE_READER_TEXT_RECORD_PER_LINE, config.get(FILE_READER_JSON_RECORD_PER_LINE)); - config.put(TextFileReader.FILE_READER_TEXT_COMPRESSION_TYPE, config.get(FILE_READER_JSON_COMPRESSION_TYPE)); - config.put(TextFileReader.FILE_READER_TEXT_COMPRESSION_CONCATENATED, config.get(FILE_READER_JSON_COMPRESSION_CONCATENATED)); - - this.inner = new TextFileReader(fs, filePath, config); - - if (hasNext()) { - String line = inner.nextRecord().getValue(); - this.schema = extractSchema(mapper.readTree(line)); - // back to the first line - inner.seek(0); - } else { - this.schema = SchemaBuilder.struct().build(); - } + super(fs, filePath, config); } @Override - protected void configure(Map config) { - mapper = new ObjectMapper(); - Set deserializationFeatures = Arrays.stream(DeserializationFeature.values()) - .map(Enum::name) - .collect(Collectors.toSet()); - config.entrySet().stream() - .filter(entry -> entry.getKey().startsWith(FILE_READER_JSON_DESERIALIZATION_CONFIGS)) - .forEach(entry -> { - String feature = entry.getKey().replaceAll(FILE_READER_JSON_DESERIALIZATION_CONFIGS, ""); - if (deserializationFeatures.contains(feature)) { - mapper.configure(DeserializationFeature.valueOf(feature), - Boolean.parseBoolean(entry.getValue())); - } else { - log.warn("{} Ignoring deserialization configuration [{}] due to it does not exist.", - this, feature); - } - }); + protected Object readerEncodingConfig(Map config) { + return config.get(FILE_READER_JSON_ENCODING); } @Override - protected JsonRecord nextRecord() throws IOException { - JsonNode value = mapper.readTree(inner.nextRecord().getValue()); - return new JsonRecord(schema, value); + protected Object recordPerLineConfig(Map config) { + return config.get(FILE_READER_JSON_RECORD_PER_LINE); } @Override - public boolean hasNextRecord() throws IOException { - return inner.hasNextRecord(); + protected Object compressionTypeConfig(Map config) { + return config.get(FILE_READER_JSON_COMPRESSION_TYPE); } @Override - public void seekFile(long offset) throws IOException { - inner.seekFile(offset); + protected Object compressionConcatenatedConfig(Map config) { + return config.get(FILE_READER_JSON_COMPRESSION_CONCATENATED); } @Override - public long currentOffset() { - return inner.currentOffset(); + protected String deserializationConfigPrefix() { + return FILE_READER_JSON_DESERIALIZATION_CONFIGS; } @Override - public void close() throws IOException { - inner.close(); - } - - @Override - public boolean isClosed() { - return inner.isClosed(); - } - - private static Schema extractSchema(JsonNode jsonNode) { - switch (jsonNode.getNodeType()) { - case BOOLEAN: - return Schema.OPTIONAL_BOOLEAN_SCHEMA; - case NUMBER: - if (jsonNode.isShort()) { - return Schema.OPTIONAL_INT8_SCHEMA; - } else if (jsonNode.isInt()) { - return Schema.OPTIONAL_INT32_SCHEMA; - } else if (jsonNode.isLong()) { - return Schema.OPTIONAL_INT64_SCHEMA; - } else if (jsonNode.isBigInteger()) { - return Schema.OPTIONAL_INT64_SCHEMA; - } else { - return Schema.OPTIONAL_FLOAT64_SCHEMA; - } - case STRING: - return Schema.OPTIONAL_STRING_SCHEMA; - case BINARY: - return Schema.OPTIONAL_BYTES_SCHEMA; - case ARRAY: - Iterable elements = jsonNode::elements; - Schema arraySchema = StreamSupport.stream(elements.spliterator(), false) - .findFirst().map(JsonFileReader::extractSchema) - .orElse(SchemaBuilder.struct().build()); - return SchemaBuilder.array(arraySchema).build(); - case OBJECT: - SchemaBuilder builder = SchemaBuilder.struct(); - jsonNode.fields() - .forEachRemaining(field -> builder.field(field.getKey(), extractSchema(field.getValue()))); - return builder.build(); - default: - return SchemaBuilder.struct().optional().build(); - } - } - - static class JsonToStruct implements ReaderAdapter { - - @Override - public Struct apply(JsonRecord record) { - return toStruct(record.schema, record.value); - } - - private Struct toStruct(Schema schema, JsonNode jsonNode) { - if (jsonNode.isNull()) return null; - Struct struct = new Struct(schema); - jsonNode.fields() - .forEachRemaining(field -> struct.put( - field.getKey(), - mapValue(struct.schema().field(field.getKey()).schema(), field.getValue()) - )); - return struct; - } - - private Object mapValue(Schema schema, JsonNode value) { - if (value == null) return null; - - switch (value.getNodeType()) { - case BOOLEAN: - return value.booleanValue(); - case NUMBER: - if (value.isShort()) { - return value.shortValue(); - } else if (value.isInt()) { - return value.intValue(); - } else if (value.isLong()) { - return value.longValue(); - } else if (value.isBigInteger()) { - return value.bigIntegerValue().longValue(); - } else { - return value.numberValue().doubleValue(); - } - case STRING: - return value.asText(); - case BINARY: - try { - return value.binaryValue(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - case OBJECT: - case POJO: - Struct struct = new Struct(schema); - Iterable> fields = value::fields; - StreamSupport.stream(fields.spliterator(), false) - .forEach(field -> struct.put(field.getKey(), - mapValue(extractSchema(field.getValue()), field.getValue())) - ); - return struct; - case ARRAY: - Iterable arrayElements = value::elements; - return StreamSupport.stream(arrayElements.spliterator(), false) - .map(elm -> mapValue(schema.valueSchema(), elm)) - .collect(Collectors.toList()); - case NULL: - case MISSING: - default: - return null; - } - } - } - - static class JsonRecord { - private final Schema schema; - private final JsonNode value; - - JsonRecord(Schema schema, JsonNode value) { - this.schema = schema; - this.value = value; - } + protected ObjectMapper getObjectMapper() { + return new ObjectMapper(); } } diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReader.java index 25a685d..6022e43 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReader.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReader.java @@ -84,19 +84,25 @@ public UnivocityFileReader(FileSystem fs, Path filePath, Map con super(fs, filePath, new UnivocityToStruct(), config); this.iterator = iterateRecords(); - this.schema = buildSchema(this.iterator, settings.isHeaderExtractionEnabled(), config); + this.schema = buildSchema(this.iterator, config); } - private Schema buildSchema(ResultIterator it, boolean hasHeader, Map config) { + private Schema buildSchema(ResultIterator it, Map config) { SchemaBuilder builder = SchemaBuilder.struct(); - if (it.hasNext() && !hasHeader) { - Record first = it.next(); - List dataTypes = getDataTypes(config, first.getValues()); - IntStream.range(0, first.getValues().length) - .forEach(index -> builder.field(DEFAULT_COLUMN_NAME + (index + 1), dataTypes.get(index))); - seek(0); - } else if (hasHeader) { - Optional.ofNullable(it.getContext().headers()).ifPresent(headers -> { + if (iterator.hasNext() && !settings.isHeaderExtractionEnabled()) { + String[] headers; + if (settings.getHeaders() == null || settings.getHeaders().length == 0) { + Record first = iterator.next(); + headers = new String[first.getValues().length]; + IntStream.range(0, headers.length).forEach(index -> headers[index] = DEFAULT_COLUMN_NAME + (index + 1)); + seek(0); + } else { + headers = settings.getHeaders(); + } + List dataTypes = getDataTypes(config, headers); + IntStream.range(0, headers.length).forEach(index -> builder.field(headers[index], dataTypes.get(index))); + } else if (settings.isHeaderExtractionEnabled()) { + Optional.ofNullable(iterator.getContext().headers()).ifPresent(headers -> { List dataTypes = getDataTypes(config, headers); IntStream.range(0, headers.length) .forEach(index -> builder.field(headers[index], dataTypes.get(index))); diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/XmlFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/XmlFileReader.java new file mode 100644 index 0000000..5f7765a --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/XmlFileReader.java @@ -0,0 +1,58 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Map; + +import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX; + +public class XmlFileReader extends JacksonFileReader { + + private static final String FILE_READER_XML = FILE_READER_PREFIX + "xml."; + private static final String FILE_READER_XML_COMPRESSION = FILE_READER_XML + "compression."; + + static final String FILE_READER_XML_DESERIALIZATION_CONFIGS = FILE_READER_XML + "deserialization."; + + public static final String FILE_READER_XML_RECORD_PER_LINE = FILE_READER_XML + "record_per_line"; + public static final String FILE_READER_XML_COMPRESSION_TYPE = FILE_READER_XML_COMPRESSION + "type"; + public static final String FILE_READER_XML_COMPRESSION_CONCATENATED = FILE_READER_XML_COMPRESSION + "concatenated"; + public static final String FILE_READER_XML_ENCODING = FILE_READER_XML + "encoding"; + + public XmlFileReader(FileSystem fs, Path filePath, Map config) throws IOException { + super(fs, filePath, config); + } + + @Override + protected Object readerEncodingConfig(Map config) { + return config.get(FILE_READER_XML_ENCODING); + } + + @Override + protected Object recordPerLineConfig(Map config) { + return config.get(FILE_READER_XML_RECORD_PER_LINE); + } + + @Override + protected Object compressionTypeConfig(Map config) { + return config.get(FILE_READER_XML_COMPRESSION_TYPE); + } + + @Override + protected Object compressionConcatenatedConfig(Map config) { + return config.get(FILE_READER_XML_COMPRESSION_CONCATENATED); + } + + @Override + protected String deserializationConfigPrefix() { + return FILE_READER_XML_DESERIALIZATION_CONFIGS; + } + + @Override + protected ObjectMapper getObjectMapper() { + return new XmlMapper(); + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/YamlFileReader.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/YamlFileReader.java new file mode 100644 index 0000000..8be5c6b --- /dev/null +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/YamlFileReader.java @@ -0,0 +1,57 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Map; + +import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX; + +public class YamlFileReader extends JacksonFileReader { + + private static final String FILE_READER_YAML = FILE_READER_PREFIX + "yaml."; + private static final String FILE_READER_YAML_COMPRESSION = FILE_READER_YAML + "compression."; + + static final String FILE_READER_YAML_DESERIALIZATION_CONFIGS = FILE_READER_YAML + "deserialization."; + + public static final String FILE_READER_YAML_COMPRESSION_TYPE = FILE_READER_YAML_COMPRESSION + "type"; + public static final String FILE_READER_YAML_COMPRESSION_CONCATENATED = FILE_READER_YAML_COMPRESSION + "concatenated"; + public static final String FILE_READER_YAML_ENCODING = FILE_READER_YAML + "encoding"; + + public YamlFileReader(FileSystem fs, Path filePath, Map config) throws IOException { + super(fs, filePath, config); + } + + @Override + protected Object readerEncodingConfig(Map config) { + return config.get(FILE_READER_YAML_ENCODING); + } + + @Override + protected Object recordPerLineConfig(Map config) { + return false; + } + + @Override + protected Object compressionTypeConfig(Map config) { + return config.get(FILE_READER_YAML_COMPRESSION_TYPE); + } + + @Override + protected Object compressionConcatenatedConfig(Map config) { + return config.get(FILE_READER_YAML_COMPRESSION_CONCATENATED); + } + + @Override + protected String deserializationConfigPrefix() { + return FILE_READER_YAML_DESERIALIZATION_CONFIGS; + } + + @Override + protected ObjectMapper getObjectMapper() { + return new YAMLMapper(); + } +} diff --git a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java index 9d23c04..5e35da4 100644 --- a/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java +++ b/src/main/java/com/github/mmolimar/kafka/connect/fs/policy/AbstractPolicy.java @@ -7,10 +7,7 @@ import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils; import com.github.mmolimar.kafka.connect.fs.util.TailCall; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.*; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -39,9 +36,16 @@ abstract class AbstractPolicy implements Policy { private final AtomicLong executions; private final boolean recursive; private final int batchSize; + private final Cleanup cleanup; + private final String cleanupDir; + private final String prefixCleanup; private Iterator> partitions; private boolean interrupted; + enum Cleanup { + NONE, MOVE, DELETE + } + public AbstractPolicy(FsSourceTaskConfig conf) throws IOException { this.fileSystems = new ArrayList<>(); this.conf = conf; @@ -49,6 +53,11 @@ public AbstractPolicy(FsSourceTaskConfig conf) throws IOException { this.recursive = conf.getBoolean(FsSourceTaskConfig.POLICY_RECURSIVE); this.fileRegexp = Pattern.compile(conf.getString(FsSourceTaskConfig.POLICY_REGEXP)); this.batchSize = conf.getInt(FsSourceTaskConfig.POLICY_BATCH_SIZE); + this.cleanup = Optional.ofNullable(conf.getString(FsSourceTaskConfig.POLICY_CLEANUP)) + .map(c -> Cleanup.valueOf(c.toUpperCase())).orElse(Cleanup.NONE); + this.prefixCleanup = Optional.ofNullable(conf.getString(FsSourceTaskConfig.POLICY_CLEANUP_MOVE_DIR_PREFIX)) + .orElse(""); + this.cleanupDir = conf.getString(FsSourceTaskConfig.POLICY_CLEANUP_MOVE_DIR); this.interrupted = false; this.partitions = Collections.emptyIterator(); @@ -227,8 +236,7 @@ current, new Path(metadata.getPath()), conf.originals() long fileSize = Long.parseLong(offsetMap.getOrDefault("file-size", "0").toString()); boolean eof = Boolean.parseBoolean(offsetMap.getOrDefault("eof", "false").toString()); if (metadata.getLen() == fileSize && eof) { - log.info("{} Skipping file [{}] due to it was already processed.", this, metadata.getPath()); - return emptyFileReader(new Path(metadata.getPath())); + return cleanupAndReturn(current, new Path(metadata.getPath())); } else { log.info("{} Seeking to offset [{}] for file [{}].", this, offsetMap.get("offset"), metadata.getPath()); @@ -269,7 +277,27 @@ public FileMetadata next() { }; } - private FileReader emptyFileReader(Path filePath) { + private FileReader cleanupAndReturn(FileSystem srcFs, Path filePath) { + try { + switch (cleanup) { + case NONE: + log.info("{} Skipping file [{}] due to it was already processed.", this, filePath); + break; + case MOVE: + Path target = new Path(cleanupDir, prefixCleanup + filePath.getName()); + FileSystem dstFs = FileSystem.newInstance(target.toUri(), srcFs.getConf()); + log.info("{} Moving file [{}] to [{}] due to it was already processed.", this, filePath, target); + FileUtil.copy(srcFs, filePath, srcFs, target, true, true, dstFs.getConf()); + break; + case DELETE: + log.info("{} Deleting file [{}] due to it was already processed.", this, filePath); + srcFs.delete(filePath, false); + break; + } + } catch (IOException ioe) { + log.warn("{} Cannot apply cleanup of type {} in file [{}]. Error message: {}", this, cleanup, filePath, ioe.getMessage()); + } + return new FileReader() { @Override public Path getFilePath() { diff --git a/src/main/scala/com/github/mmolimar/kafka/connect/fs/file/reader/CobrixReader.scala b/src/main/scala/com/github/mmolimar/kafka/connect/fs/file/reader/CobrixReader.scala new file mode 100644 index 0000000..abfb9bc --- /dev/null +++ b/src/main/scala/com/github/mmolimar/kafka/connect/fs/file/reader/CobrixReader.scala @@ -0,0 +1,15 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader + +import com.github.mmolimar.kafka.connect.fs.file.reader.CobolFileReader.StructHandler +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.{VarLenNestedReader, VarLenReader} + +import scala.collection.Seq + +protected object CobrixReader { + + def varLenReader(copybookContent: String, params: ReaderParameters): VarLenReader = { + new VarLenNestedReader[java.util.Map[String, AnyRef]](Seq(copybookContent), params, new StructHandler()) + } + +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java index 3c01e86..1055b2a 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/AgnosticFileReaderTest.java @@ -113,6 +113,48 @@ public String getFileExtension() { } } + @Nested + class AgnosticXmlFileReaderTest extends XmlFileReaderTest { + + @Override + protected Map getReaderConfig() { + Map config = super.getReaderConfig(); + config.put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_XML, getFileExtension()); + return config; + } + + @Override + public Class getReaderClass() { + return AgnosticFileReader.class; + } + + @Override + public String getFileExtension() { + return FILE_EXTENSION; + } + } + + @Nested + class AgnosticYamlFileReaderTest extends YamlFileReaderTest { + + @Override + protected Map getReaderConfig() { + Map config = super.getReaderConfig(); + config.put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_YAML, getFileExtension()); + return config; + } + + @Override + public Class getReaderClass() { + return AgnosticFileReader.class; + } + + @Override + public String getFileExtension() { + return FILE_EXTENSION; + } + } + @Nested class AgnosticAvroFileReaderTest extends AvroFileReaderTest { @@ -202,4 +244,24 @@ public String getFileExtension() { } } + @Nested + class AgnosticCobolFileReaderTest extends CobolFileReaderTest { + + @Override + protected Map getReaderConfig() { + Map config = super.getReaderConfig(); + config.put(AgnosticFileReader.FILE_READER_AGNOSTIC_EXTENSIONS_COBOL, getFileExtension()); + return config; + } + + @Override + public Class getReaderClass() { + return AgnosticFileReader.class; + } + + @Override + public String getFileExtension() { + return "dt"; + } + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReaderTest.java new file mode 100644 index 0000000..ec55a81 --- /dev/null +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/CobolFileReaderTest.java @@ -0,0 +1,263 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.*; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; + +public class CobolFileReaderTest extends FileReaderTestBase { + + private static final String FILE_EXTENSION = "dt"; + private static final String DATA_FILENAME_1 = "companies"; + private static final String DATA_FILENAME_2 = "type-variety"; + private static final String DATA_FILENAME_3 = "code-pages"; + + @Override + protected Class getReaderClass() { + return CobolFileReader.class; + } + + @Override + protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throws IOException { + String filename = args.length < 1 ? DATA_FILENAME_1 : args[0].toString(); + File cobolFile = File.createTempFile("test-", "." + getFileExtension()); + try (InputStream is = CobolFileReaderTest.class.getResourceAsStream("/file/reader/data/cobol/" + filename + "." + getFileExtension())) { + Files.copy(is, cobolFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + IntStream.range(0, NUM_RECORDS).forEach(index -> fsConfig.offsetsByIndex().put(index, (long) index)); + Path path = new Path(new Path(fsConfig.getFsUri()), cobolFile.getName()); + fsConfig.getFs().copyFromLocalFile(new Path(cobolFile.getAbsolutePath()), path); + + return path; + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidFileFormat(ReaderFsTestConfig fsConfig) throws IOException { + File tmp = File.createTempFile("test-", "." + getFileExtension()); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(tmp))) { + writer.write("test"); + } + Path path = new Path(new Path(fsConfig.getFsUri()), tmp.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); + getReader(fsConfig.getFs(), path, getReaderConfig()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyFile(ReaderFsTestConfig fsConfig) throws IOException { + File tmp = File.createTempFile("test-", "." + getFileExtension()); + Path path = new Path(new Path(fsConfig.getFsUri()), tmp.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); + getReader(fsConfig.getFs(), path, getReaderConfig()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyCopybook(ReaderFsTestConfig fsConfig) throws IOException { + Path file = createDataFile(fsConfig); + Map readerConfig = getReaderConfig(); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_CONTENT, ""); + assertThrows(ConnectException.class, () -> getReader(fsConfig.getFs(), file, readerConfig)); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void nonExistentCopybook(ReaderFsTestConfig fsConfig) throws IOException { + Path file = createDataFile(fsConfig); + Map readerConfig = getReaderConfig(); + Path copybook = new Path(fsConfig.getFs().getWorkingDirectory(), UUID.randomUUID().toString()); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_PATH, copybook.toString()); + assertThrows(ConnectException.class, () -> getReader(fsConfig.getFs(), file, readerConfig)); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllDataWithCopybookInPath(ReaderFsTestConfig fsConfig) throws IOException { + String dataFilename = DATA_FILENAME_1; + Path file = createDataFile(fsConfig, dataFilename); + Map readerConfig = getReaderConfig(); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_PATH, ""); + assertThrows(ConnectException.class, () -> getReader(fsConfig.getFs(), file, readerConfig)); + + File cobolFile = File.createTempFile("copybook-", "." + getFileExtension()); + try (InputStream is = CobolFileReaderTest.class.getResourceAsStream("/file/reader/data/cobol/" + dataFilename + ".cpy")) { + Files.copy(is, cobolFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + Path path = new Path(new Path(fsConfig.getFsUri()), cobolFile.getName()); + fsConfig.getFs().copyFromLocalFile(new Path(cobolFile.getAbsolutePath()), path); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_PATH, path.toString()); + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllDataWithMultipleDataTypes(ReaderFsTestConfig fsConfig) throws IOException { + String dataFilename = DATA_FILENAME_2; + Path file = createDataFile(fsConfig, dataFilename); + Map readerConfig = getReaderConfig(); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_CONTENT, copybookContent(dataFilename)); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_SCHEMA_POLICY, "collapse_root"); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_FLOATING_POINT_FORMAT, "ieee754"); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_IS_RECORD_SEQUENCE, "false"); + + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + recordCount++; + assertEquals(recordCount, record.get("ID")); + assertEquals("Sample", record.get("STRING_VAL")); + } + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllDataWithBinaryData(ReaderFsTestConfig fsConfig) throws IOException { + String dataFilename = DATA_FILENAME_3; + Path file = createDataFile(fsConfig, dataFilename); + Map readerConfig = getReaderConfig(); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_CONTENT, copybookContent(dataFilename)); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_SCHEMA_POLICY, "collapse_root"); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_IS_RECORD_SEQUENCE, "false"); + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + assertEquals(Schema.Type.STRING, record.schema().field("CURRENCY").schema().type()); + assertEquals(Schema.Type.STRING, record.schema().field("SIGNATURE").schema().type()); + assertEquals(Schema.Type.STRING, record.schema().field("COMPANY_NAME_NP").schema().type()); + assertEquals(Schema.Type.STRING, record.schema().field("COMPANY_ID").schema().type()); + assertEquals(Schema.Type.INT32, record.schema().field("WEALTH_QFY").schema().type()); + assertEquals(Schema.Type.FLOAT64, record.schema().field("AMOUNT").schema().type()); + assertNotNull(record.get("CURRENCY")); + assertNotNull(record.get("SIGNATURE")); + assertNotNull(record.get("COMPANY_NAME_NP")); + assertNotNull(record.get("COMPANY_ID")); + assertNotNull(record.get("WEALTH_QFY")); + assertNotNull(record.get("AMOUNT")); + assertEquals(6, record.schema().fields().size()); + recordCount++; + } + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllDataWithBinaryRawData(ReaderFsTestConfig fsConfig) throws IOException { + String dataFilename = DATA_FILENAME_3; + Path file = createDataFile(fsConfig, dataFilename); + Map readerConfig = getReaderConfig(); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_CONTENT, copybookContent(dataFilename)); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_SCHEMA_POLICY, "collapse_root"); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_DEBUG_FIELDS_POLICY, "raw"); + readerConfig.put(CobolFileReader.FILE_READER_COBOL_READER_IS_RECORD_SEQUENCE, "false"); + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + assertEquals(Schema.Type.STRING, record.schema().field("CURRENCY").schema().type()); + assertEquals(Schema.Type.STRING, record.schema().field("SIGNATURE").schema().type()); + assertEquals(Schema.Type.STRING, record.schema().field("COMPANY_NAME_NP").schema().type()); + assertEquals(Schema.Type.STRING, record.schema().field("COMPANY_ID").schema().type()); + assertEquals(Schema.Type.INT32, record.schema().field("WEALTH_QFY").schema().type()); + assertEquals(Schema.Type.FLOAT64, record.schema().field("AMOUNT").schema().type()); + assertNotNull(record.get("CURRENCY")); + assertNotNull(record.get("CURRENCY_debug")); + assertNotNull(record.get("SIGNATURE")); + assertNotNull(record.get("SIGNATURE_debug")); + assertNotNull(record.get("COMPANY_NAME_NP")); + assertNotNull(record.get("COMPANY_NAME_NP_debug")); + assertNotNull(record.get("COMPANY_ID")); + assertNotNull(record.get("COMPANY_ID_debug")); + assertNotNull(record.get("WEALTH_QFY")); + assertNotNull(record.get("WEALTH_QFY_debug")); + assertNotNull(record.get("AMOUNT")); + assertNotNull(record.get("AMOUNT_debug")); + assertEquals(12, record.schema().fields().size()); + recordCount++; + } + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + + @Override + protected Map getReaderConfig() { + return new HashMap() {{ + put(CobolFileReader.FILE_READER_COBOL_COPYBOOK_CONTENT, copybookContent(DATA_FILENAME_1)); + put(CobolFileReader.FILE_READER_COBOL_READER_IS_RECORD_SEQUENCE, "true"); + }}; + } + + private String copybookContent(String filename) { + URL cpy = CobolFileReaderTest.class.getResource("/file/reader/data/cobol/" + filename + ".cpy"); + try { + return String.join("\n", Files.readAllLines(Paths.get(cpy.toURI()))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + + @Override + protected void checkData(Struct record, long index) { + Struct companyDetails = record.getStruct("COMPANY_DETAILS"); + Struct staticDetails = companyDetails.getStruct("STATIC_DETAILS"); + Struct taxpayer = staticDetails.getStruct("TAXPAYER"); + Struct strategy = staticDetails.getStruct("STRATEGY"); + List strategyDetails = strategy.getArray("STRATEGY_DETAIL"); + assertAll( + () -> assertEquals("C", companyDetails.getString("SEGMENT_ID")), + () -> assertEquals(String.format("%010d", index), companyDetails.getString("COMPANY_ID")), + + () -> assertEquals("Sample Q&A Ltd.", staticDetails.getString("COMPANY_NAME")), + () -> assertEquals("223344 AK ave, Wonderland", staticDetails.getString("ADDRESS")), + + () -> assertEquals("A", taxpayer.getString("TAXPAYER_TYPE")), + () -> assertEquals("88888888", taxpayer.getString("TAXPAYER_STR")), + () -> assertNull(taxpayer.getInt32("TAXPAYER_NUM")), + + () -> assertEquals(6, strategyDetails.size()), + () -> assertEquals(1111111, (strategyDetails.get(0)).getInt32("NUM1")), + () -> assertEquals(2222222, (strategyDetails.get(0)).getInt32("NUM2")) + ); + } +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JacksonFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JacksonFileReaderTest.java new file mode 100644 index 0000000..2c6bc8a --- /dev/null +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JacksonFileReaderTest.java @@ -0,0 +1,210 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.math.BigInteger; +import java.nio.charset.UnsupportedCharsetException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; + +abstract class JacksonFileReaderTest extends FileReaderTestBase { + + static final String FIELD_INTEGER = "integerField"; + static final String FIELD_BIG_INTEGER = "bigIntegerField"; + static final String FIELD_LONG = "longField"; + static final String FIELD_BOOLEAN = "booleanField"; + static final String FIELD_STRING = "stringField"; + static final String FIELD_DECIMAL = "decimalField"; + static final String FIELD_BINARY = "binaryField"; + static final String FIELD_ARRAY_SIMPLE = "arraySimpleField"; + static final String FIELD_ARRAY_COMPLEX = "arrayComplexField"; + static final String FIELD_STRUCT = "structField"; + static final String FIELD_NULL = "nullField"; + static final CompressionType COMPRESSION_TYPE_DEFAULT = CompressionType.NONE; + + @Override + protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throws IOException { + int numRecords = args.length < 1 ? NUM_RECORDS : (int) args[0]; + boolean recordPerLine = args.length < 2 || (boolean) args[1]; + CompressionType compression = args.length < 3 ? COMPRESSION_TYPE_DEFAULT : (CompressionType) args[2]; + File txtFile = File.createTempFile("test-", "." + getFileExtension()); + try (PrintWriter writer = new PrintWriter(getOutputStream(txtFile, compression))) { + ObjectWriter objectWriter = getObjectMapper().writerWithDefaultPrettyPrinter(); + IntStream.range(0, numRecords).forEach(index -> { + ObjectNode node = JsonNodeFactory.instance.objectNode() + .put(FIELD_INTEGER, index) + .put(FIELD_BIG_INTEGER, new BigInteger("9999999999999999999")) + .put(FIELD_LONG, Long.MAX_VALUE) + .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) + .put(FIELD_BOOLEAN, true) + .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) + .put(FIELD_BINARY, "test".getBytes()) + .put(FIELD_NULL, (String) null); + node.putArray(FIELD_ARRAY_SIMPLE) + .add("elm[" + index + "]") + .add("elm[" + (index + 1) + "]"); + ArrayNode array = node.putArray(FIELD_ARRAY_COMPLEX); + array.addObject() + .put(FIELD_INTEGER, index) + .put(FIELD_LONG, Long.MAX_VALUE) + .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) + .put(FIELD_BOOLEAN, true) + .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) + .put(FIELD_NULL, (String) null); + array.addObject() + .put(FIELD_INTEGER, index + 1) + .put(FIELD_LONG, Long.MAX_VALUE) + .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) + .put(FIELD_BOOLEAN, true) + .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) + .put(FIELD_NULL, (String) null); + node.putObject(FIELD_STRUCT) + .put(FIELD_INTEGER, (short) index) + .put(FIELD_LONG, Long.MAX_VALUE) + .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) + .put(FIELD_BOOLEAN, true) + .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) + .put(FIELD_NULL, (String) null); + try { + writer.append(recordPerLine ? objectWriter.writeValueAsString(node).replaceAll("\n", "") + "\n" : objectWriter.writeValueAsString(node)); + } catch (JsonProcessingException jpe) { + throw new RuntimeException(jpe); + } + fsConfig.offsetsByIndex().put(index, (long) index); + }); + } + Path path = new Path(new Path(fsConfig.getFsUri()), txtFile.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(txtFile.getAbsolutePath()), path); + return path; + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void emptyFile(ReaderFsTestConfig fsConfig) throws IOException { + File tmp = File.createTempFile("test-", "." + getFileExtension()); + Path path = new Path(new Path(fsConfig.getFsUri()), tmp.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); + FileReader reader = getReader(fsConfig.getFs(), path, getReaderConfig()); + assertFalse(reader.hasNext()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void validFileEncoding(ReaderFsTestConfig fsConfig) { + Map readerConfig = getReaderConfig(); + readerConfig.put(readerEncodingConfig(), "Cp1252"); + fsConfig.setReader(getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); + readAllData(fsConfig); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidDeserializationConfig(ReaderFsTestConfig fsConfig) { + Map readerConfig = getReaderConfig(); + readerConfig.put(deserializationConfigPrefix() + "invalid", "false"); + fsConfig.setReader(getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); + readAllData(fsConfig); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidFileEncoding(ReaderFsTestConfig fsConfig) { + Map readerConfig = getReaderConfig(); + readerConfig.put(readerEncodingConfig(), "invalid_charset"); + assertThrows(ConnectException.class, () -> getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); + assertThrows(UnsupportedCharsetException.class, () -> { + try { + getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig); + } catch (Exception e) { + throw e.getCause(); + } + }); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readDataWithRecordPerLineDisabled(ReaderFsTestConfig fsConfig) throws IOException { + Path file = createDataFile(fsConfig, 1, false); + Map readerConfig = getReaderConfig(); + readerConfig.put(recordPerLineConfig(), "false"); + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + reader.close(); + assertEquals(1, recordCount, "The number of records in the file does not match"); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readDifferentCompressionTypes(ReaderFsTestConfig fsConfig) { + Arrays.stream(CompressionType.values()).forEach(compressionType -> { + try { + Path file = createDataFile(fsConfig, NUM_RECORDS, true, compressionType); + Map readerConfig = getReaderConfig(); + readerConfig.put(compressionTypeConfig(), compressionType.toString()); + readerConfig.put(compressionConcatenatedConfig(), "true"); + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + reader.close(); + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + protected Map getReaderConfig() { + return new HashMap() {{ + String deserializationConfig = DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT.name(); + put(deserializationConfigPrefix() + deserializationConfig, "true"); + }}; + } + + protected abstract String readerEncodingConfig(); + + protected abstract String recordPerLineConfig(); + + protected abstract String compressionTypeConfig(); + + protected abstract String compressionConcatenatedConfig(); + + protected abstract String deserializationConfigPrefix(); + + protected abstract ObjectMapper getObjectMapper(); + +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java index f7f6da0..6602746 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/JsonFileReaderTest.java @@ -1,202 +1,17 @@ package com.github.mmolimar.kafka.connect.fs.file.reader; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.hadoop.fs.Path; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; import java.math.BigInteger; -import java.nio.charset.UnsupportedCharsetException; -import java.util.*; -import java.util.stream.IntStream; +import java.util.Arrays; +import java.util.List; import static org.junit.jupiter.api.Assertions.*; -public class JsonFileReaderTest extends FileReaderTestBase { - - private static final String FIELD_INTEGER = "integerField"; - private static final String FIELD_BIG_INTEGER = "bigIntegerField"; - private static final String FIELD_LONG = "longField"; - private static final String FIELD_BOOLEAN = "booleanField"; - private static final String FIELD_STRING = "stringField"; - private static final String FIELD_DECIMAL = "decimalField"; - private static final String FIELD_BINARY = "binaryField"; - private static final String FIELD_ARRAY_SIMPLE = "arraySimpleField"; - private static final String FIELD_ARRAY_COMPLEX = "arrayComplexField"; - private static final String FIELD_STRUCT = "structField"; - private static final String FIELD_NULL = "nullField"; - private static final String FILE_EXTENSION = "jsn"; - private static final CompressionType COMPRESSION_TYPE_DEFAULT = CompressionType.NONE; - - @Override - protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throws IOException { - int numRecords = args.length < 1 ? NUM_RECORDS : (int) args[0]; - boolean recordPerLine = args.length < 2 || (boolean) args[1]; - CompressionType compression = args.length < 3 ? COMPRESSION_TYPE_DEFAULT : (CompressionType) args[2]; - File txtFile = File.createTempFile("test-", "." + getFileExtension()); - try (PrintWriter writer = new PrintWriter(getOutputStream(txtFile, compression))) { - ObjectWriter jsonWriter = new ObjectMapper().writerWithDefaultPrettyPrinter(); - IntStream.range(0, numRecords).forEach(index -> { - ObjectNode json = JsonNodeFactory.instance.objectNode() - .put(FIELD_INTEGER, index) - .put(FIELD_BIG_INTEGER, new BigInteger("9999999999999999999")) - .put(FIELD_LONG, Long.MAX_VALUE) - .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) - .put(FIELD_BOOLEAN, true) - .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) - .put(FIELD_BINARY, "test".getBytes()) - .put(FIELD_NULL, (String) null); - json.putArray(FIELD_ARRAY_SIMPLE) - .add("elm[" + index + "]") - .add("elm[" + (index + 1) + "]"); - ArrayNode array = json.putArray(FIELD_ARRAY_COMPLEX); - array.addObject() - .put(FIELD_INTEGER, index) - .put(FIELD_LONG, Long.MAX_VALUE) - .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) - .put(FIELD_BOOLEAN, true) - .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) - .put(FIELD_NULL, (String) null); - array.addObject() - .put(FIELD_INTEGER, index + 1) - .put(FIELD_LONG, Long.MAX_VALUE) - .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) - .put(FIELD_BOOLEAN, true) - .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) - .put(FIELD_NULL, (String) null); - json.putObject(FIELD_STRUCT) - .put(FIELD_INTEGER, (short) index) - .put(FIELD_LONG, Long.MAX_VALUE) - .put(FIELD_STRING, String.format("%d_%s", index, UUID.randomUUID())) - .put(FIELD_BOOLEAN, true) - .put(FIELD_DECIMAL, Double.parseDouble(index + "." + index)) - .put(FIELD_NULL, (String) null); - try { - writer.append(recordPerLine ? json.toString() + "\n" : jsonWriter.writeValueAsString(json)); - } catch (JsonProcessingException jpe) { - throw new RuntimeException(jpe); - } - fsConfig.offsetsByIndex().put(index, (long) index); - }); - } - Path path = new Path(new Path(fsConfig.getFsUri()), txtFile.getName()); - fsConfig.getFs().moveFromLocalFile(new Path(txtFile.getAbsolutePath()), path); - return path; - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void emptyFile(ReaderFsTestConfig fsConfig) throws IOException { - File tmp = File.createTempFile("test-", "." + getFileExtension()); - Path path = new Path(new Path(fsConfig.getFsUri()), tmp.getName()); - fsConfig.getFs().moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); - FileReader reader = getReader(fsConfig.getFs(), path, getReaderConfig()); - assertFalse(reader.hasNext()); - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void validFileEncoding(ReaderFsTestConfig fsConfig) { - Map readerConfig = getReaderConfig(); - readerConfig.put(JsonFileReader.FILE_READER_JSON_ENCODING, "Cp1252"); - fsConfig.setReader(getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); - readAllData(fsConfig); - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void invalidDeserializationConfig(ReaderFsTestConfig fsConfig) { - Map readerConfig = getReaderConfig(); - readerConfig.put(JsonFileReader.FILE_READER_JSON_DESERIALIZATION_CONFIGS + "invalid", "false"); - fsConfig.setReader(getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); - readAllData(fsConfig); - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void invalidFileEncoding(ReaderFsTestConfig fsConfig) { - Map readerConfig = getReaderConfig(); - readerConfig.put(JsonFileReader.FILE_READER_JSON_ENCODING, "invalid_charset"); - assertThrows(ConnectException.class, () -> getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig)); - assertThrows(UnsupportedCharsetException.class, () -> { - try { - getReader(fsConfig.getFs(), fsConfig.getDataFile(), readerConfig); - } catch (Exception e) { - throw e.getCause(); - } - }); - } +public class JsonFileReaderTest extends JacksonFileReaderTest { - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void readDataWithRecordPerLineDisabled(ReaderFsTestConfig fsConfig) throws IOException { - Path file = createDataFile(fsConfig, 1, false); - Map readerConfig = getReaderConfig(); - readerConfig.put(JsonFileReader.FILE_READER_JSON_RECORD_PER_LINE, "false"); - FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); - - assertTrue(reader.hasNext()); - - int recordCount = 0; - while (reader.hasNext()) { - Struct record = reader.next(); - checkData(record, recordCount); - recordCount++; - } - reader.close(); - assertEquals(1, recordCount, "The number of records in the file does not match"); - } - - @ParameterizedTest - @MethodSource("fileSystemConfigProvider") - public void readDifferentCompressionTypes(ReaderFsTestConfig fsConfig) { - Arrays.stream(CompressionType.values()).forEach(compressionType -> { - try { - Path file = createDataFile(fsConfig, NUM_RECORDS, true, compressionType); - Map readerConfig = getReaderConfig(); - readerConfig.put(JsonFileReader.FILE_READER_JSON_COMPRESSION_TYPE, compressionType.toString()); - readerConfig.put(JsonFileReader.FILE_READER_JSON_COMPRESSION_CONCATENATED, "true"); - FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); - - assertTrue(reader.hasNext()); - - int recordCount = 0; - while (reader.hasNext()) { - Struct record = reader.next(); - checkData(record, recordCount); - recordCount++; - } - reader.close(); - assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - @Override - protected Class getReaderClass() { - return JsonFileReader.class; - } - - @Override - protected Map getReaderConfig() { - return new HashMap() {{ - String deserializationConfig = DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT.name(); - put(JsonFileReader.FILE_READER_JSON_DESERIALIZATION_CONFIGS + deserializationConfig, "true"); - }}; - } + private static final String FILE_EXTENSION = "jsn"; @Override protected void checkData(Struct record, long index) { @@ -239,8 +54,43 @@ protected void checkData(Struct record, long index) { ); } + @Override + protected Class getReaderClass() { + return JsonFileReader.class; + } + @Override protected String getFileExtension() { return FILE_EXTENSION; } + + @Override + protected String readerEncodingConfig() { + return JsonFileReader.FILE_READER_JSON_ENCODING; + } + + @Override + protected String recordPerLineConfig() { + return JsonFileReader.FILE_READER_JSON_RECORD_PER_LINE; + } + + @Override + protected String compressionTypeConfig() { + return JsonFileReader.FILE_READER_JSON_COMPRESSION_TYPE; + } + + @Override + protected String compressionConcatenatedConfig() { + return JsonFileReader.FILE_READER_JSON_COMPRESSION_CONCATENATED; + } + + @Override + protected String deserializationConfigPrefix() { + return JsonFileReader.FILE_READER_JSON_DESERIALIZATION_CONFIGS; + } + + @Override + protected ObjectMapper getObjectMapper() { + return new ObjectMapper(); + } } diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java index 27c0edf..49449ea 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/UnivocityFileReaderTest.java @@ -94,6 +94,29 @@ public void readAllDataWithoutHeader(ReaderFsTestConfig fsConfig) throws IOExcep assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllDataWithCustomHeaders(ReaderFsTestConfig fsConfig) throws IOException { + Path file = createDataFile(fsConfig, false); + Map readerConfig = getReaderConfig(); + readerConfig.put(T.FILE_READER_DELIMITED_SETTINGS_HEADER, "false"); + // NOTE: 9 custom header names to match the quantity of static fields + // FIELD_COLUMN1, ... , FIELD_COLUMN9 + String[] headers = new String[] { "a", "b", "c", "d", "e", "f", "g", "h", "i" }; + readerConfig.put(T.FILE_READER_DELIMITED_SETTINGS_HEADER_NAMES, String.join(",", headers)); + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkDataWithHeaders(record, recordCount, headers); + recordCount++; + } + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + @ParameterizedTest @MethodSource("fileSystemConfigProvider") public void readAllDataWithoutSchema(ReaderFsTestConfig fsConfig) throws IOException { @@ -261,6 +284,19 @@ protected void checkData(Struct record, long index) { ); } + protected void checkDataWithHeaders(Struct record, long index, String[] headers) { + assertAll(() -> assertEquals((byte) 2, record.get(headers[0])), + () -> assertEquals((short) 4, record.get(headers[1])), + () -> assertEquals(8, record.get(headers[2])), + () -> assertEquals(16L, record.get(headers[3])), + () -> assertEquals(32.32f, record.get(headers[4])), + () -> assertEquals(64.64d, record.get(headers[5])), + () -> assertEquals(true, record.get(headers[6])), + () -> assertEquals("test bytes", new String((byte[]) record.get(headers[7]))), + () -> assertEquals("test string", record.get(headers[8])) + ); + } + protected void checkDataString(Struct record) { assertAll( () -> assertEquals("2", record.get(FIELD_COLUMN1)), diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/XmlFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/XmlFileReaderTest.java new file mode 100644 index 0000000..8a035ed --- /dev/null +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/XmlFileReaderTest.java @@ -0,0 +1,93 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import org.apache.kafka.connect.data.Struct; + +import static org.junit.jupiter.api.Assertions.*; + +public class XmlFileReaderTest extends JacksonFileReaderTest { + + private static final String FILE_EXTENSION = "xl"; + + @Override + protected void checkData(Struct record, long index) { + Struct array = record.getStruct(FIELD_ARRAY_COMPLEX); + Struct subrecord = record.getStruct(FIELD_STRUCT); + assertAll( + () -> assertEquals(index, Integer.parseInt(record.getString(FIELD_INTEGER))), + () -> assertEquals("9999999999999999999", record.get(FIELD_BIG_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, Long.parseLong(record.getString(FIELD_LONG))), + () -> assertTrue(record.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(record.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), Double.parseDouble(record.getString(FIELD_DECIMAL))), + () -> assertNull(record.get(FIELD_NULL)), + () -> assertNotNull(record.schema().field(FIELD_NULL)), + () -> assertEquals("dGVzdA==", record.get(FIELD_BINARY)), + () -> assertEquals("elm[" + (index + 1) + "]", record.get(FIELD_ARRAY_SIMPLE)), + + () -> assertEquals(index + 1, Integer.parseInt(array.getString(FIELD_INTEGER))), + () -> assertEquals(Long.MAX_VALUE, Long.parseLong(array.getString(FIELD_LONG))), + () -> assertTrue(array.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(array.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), Double.parseDouble(array.getString(FIELD_DECIMAL))), + () -> assertNull(array.get(FIELD_NULL)), + () -> assertNotNull(array.schema().field(FIELD_NULL)), + () -> assertEquals(index + 1, Integer.parseInt(array.getString(FIELD_INTEGER))), + () -> assertEquals(Long.MAX_VALUE, Long.parseLong(array.getString(FIELD_LONG))), + () -> assertTrue(array.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(array.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), Double.parseDouble(array.getString(FIELD_DECIMAL))), + () -> assertNull(array.get(FIELD_NULL)), + () -> assertNotNull(array.schema().field(FIELD_NULL)), + + () -> assertEquals(index, Integer.parseInt(subrecord.getString(FIELD_INTEGER))), + () -> assertEquals(Long.MAX_VALUE, Long.parseLong(subrecord.getString(FIELD_LONG))), + () -> assertTrue(subrecord.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(subrecord.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), Double.parseDouble(subrecord.getString(FIELD_DECIMAL))), + () -> assertNull(subrecord.get(FIELD_NULL)), + () -> assertNotNull(subrecord.schema().field(FIELD_NULL)) + ); + } + + @Override + protected Class getReaderClass() { + return XmlFileReader.class; + } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + + @Override + protected String readerEncodingConfig() { + return XmlFileReader.FILE_READER_XML_ENCODING; + } + + @Override + protected String recordPerLineConfig() { + return XmlFileReader.FILE_READER_XML_RECORD_PER_LINE; + } + + @Override + protected String compressionTypeConfig() { + return XmlFileReader.FILE_READER_XML_COMPRESSION_TYPE; + } + + @Override + protected String compressionConcatenatedConfig() { + return XmlFileReader.FILE_READER_XML_COMPRESSION_CONCATENATED; + } + + @Override + protected String deserializationConfigPrefix() { + return XmlFileReader.FILE_READER_XML_DESERIALIZATION_CONFIGS; + } + + @Override + protected ObjectMapper getObjectMapper() { + return new XmlMapper(); + } +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/YamlFileReaderTest.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/YamlFileReaderTest.java new file mode 100644 index 0000000..121bda3 --- /dev/null +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/YamlFileReaderTest.java @@ -0,0 +1,188 @@ +package com.github.mmolimar.kafka.connect.fs.file.reader; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLMapper; +import org.apache.hadoop.fs.Path; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class YamlFileReaderTest extends JacksonFileReaderTest { + + private static final String FILE_EXTENSION = "yl"; + + protected static final int NUM_RECORDS = 1; + + @Override + protected Path createDataFile(ReaderFsTestConfig fsConfig, Object... args) throws IOException { + CompressionType compression = args.length < 3 ? COMPRESSION_TYPE_DEFAULT : (CompressionType) args[2]; + return super.createDataFile(fsConfig, 1, false, compression); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidFileFormat(ReaderFsTestConfig fsConfig) throws IOException { + File tmp = File.createTempFile("test-", "." + getFileExtension()); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(tmp))) { + writer.write("test"); + } + Path path = new Path(new Path(fsConfig.getFsUri()), tmp.getName()); + fsConfig.getFs().moveFromLocalFile(new Path(tmp.getAbsolutePath()), path); + getReader(fsConfig.getFs(), path, getReaderConfig()); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readAllData(ReaderFsTestConfig fsConfig) { + FileReader reader = fsConfig.getReader(); + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Disabled + public void seekFile(ReaderFsTestConfig fsConfig) { + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Disabled + public void exceededSeek(ReaderFsTestConfig fsConfig) { + + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + @Disabled + public void readAllDataInBatches(ReaderFsTestConfig fsConfig) { + + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void readDifferentCompressionTypes(ReaderFsTestConfig fsConfig) { + Arrays.stream(CompressionType.values()).forEach(compressionType -> { + try { + Path file = createDataFile(fsConfig, NUM_RECORDS, true, compressionType); + Map readerConfig = getReaderConfig(); + readerConfig.put(compressionTypeConfig(), compressionType.toString()); + readerConfig.put(compressionConcatenatedConfig(), "true"); + FileReader reader = getReader(fsConfig.getFs(), file, readerConfig); + + assertTrue(reader.hasNext()); + + int recordCount = 0; + while (reader.hasNext()) { + Struct record = reader.next(); + checkData(record, recordCount); + recordCount++; + } + reader.close(); + assertEquals(NUM_RECORDS, recordCount, "The number of records in the file does not match"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Override + protected void checkData(Struct record, long index) { + List array = record.getArray(FIELD_ARRAY_COMPLEX); + Struct subrecord = record.getStruct(FIELD_STRUCT); + assertAll( + () -> assertEquals(index, (int) record.get(FIELD_INTEGER)), + () -> assertEquals(new BigInteger("9999999999999999999").longValue(), record.get(FIELD_BIG_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) record.get(FIELD_LONG)), + () -> assertTrue(record.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(record.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) record.get(FIELD_DECIMAL), 0), + () -> assertNull(record.get(FIELD_NULL)), + () -> assertNotNull(record.schema().field(FIELD_NULL)), + () -> assertEquals("test", new String((byte[]) record.get(FIELD_BINARY))), + () -> assertEquals(Arrays.asList("elm[" + index + "]", "elm[" + (index + 1) + "]"), record.get(FIELD_ARRAY_SIMPLE)), + + () -> assertEquals(index, (int) array.get(0).get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) array.get(0).get(FIELD_LONG)), + () -> assertTrue(array.get(0).get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(array.get(0).get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) array.get(0).get(FIELD_DECIMAL), 0), + () -> assertNull(array.get(0).get(FIELD_NULL)), + () -> assertNotNull(array.get(0).schema().field(FIELD_NULL)), + () -> assertEquals(index + 1, (int) array.get(1).get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) array.get(1).get(FIELD_LONG)), + () -> assertTrue(array.get(1).get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(array.get(1).get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) array.get(1).get(FIELD_DECIMAL), 0), + () -> assertNull(array.get(1).get(FIELD_NULL)), + () -> assertNotNull(array.get(1).schema().field(FIELD_NULL)), + + () -> assertEquals(index, (int) subrecord.get(FIELD_INTEGER)), + () -> assertEquals(Long.MAX_VALUE, (long) subrecord.get(FIELD_LONG)), + () -> assertTrue(subrecord.get(FIELD_STRING).toString().startsWith(index + "_")), + () -> assertTrue(Boolean.parseBoolean(subrecord.get(FIELD_BOOLEAN).toString())), + () -> assertEquals(Double.parseDouble(index + "." + index), (Double) subrecord.get(FIELD_DECIMAL), 0), + () -> assertNull(subrecord.get(FIELD_NULL)), + () -> assertNotNull(subrecord.schema().field(FIELD_NULL)) + ); + } + + @Override + protected Class getReaderClass() { + return YamlFileReader.class; + } + + @Override + protected String getFileExtension() { + return FILE_EXTENSION; + } + + @Override + protected String readerEncodingConfig() { + return YamlFileReader.FILE_READER_YAML_ENCODING; + } + + @Override + protected String recordPerLineConfig() { + return "UNKNOWN"; + } + + @Override + protected String compressionTypeConfig() { + return YamlFileReader.FILE_READER_YAML_COMPRESSION_TYPE; + } + + @Override + protected String compressionConcatenatedConfig() { + return YamlFileReader.FILE_READER_YAML_COMPRESSION_CONCATENATED; + } + + @Override + protected String deserializationConfigPrefix() { + return YamlFileReader.FILE_READER_YAML_DESERIALIZATION_CONFIGS; + } + + @Override + protected ObjectMapper getObjectMapper() { + return new YAMLMapper(); + } +} diff --git a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java index 9fbb429..b9ac7dc 100644 --- a/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java +++ b/src/test/java/com/github/mmolimar/kafka/connect/fs/policy/PolicyTestBase.java @@ -90,6 +90,17 @@ public void invalidConfig(PolicyFsTestConfig fsConfig) { new FsSourceTaskConfig(new HashMap<>()))); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void invalidConfigCleanup(PolicyFsTestConfig fsConfig) { + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.POLICY_CLEANUP, "invalid"); + assertThrows(ConnectException.class, () -> + ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), + new FsSourceTaskConfig(originals))); + } + @ParameterizedTest @MethodSource("fileSystemConfigProvider") public void interruptPolicy(PolicyFsTestConfig fsConfig) throws IOException { @@ -142,6 +153,79 @@ public void oneFilePerFs(PolicyFsTestConfig fsConfig) throws IOException, Interr assertFalse(it.hasNext()); } + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void oneFilePerFsWithMoveCleanup(PolicyFsTestConfig fsConfig) throws IOException { + FileSystem fs = fsConfig.getFs(); + + Path source = new Path(fsConfig.getFsUri().toString(), "source"); + Path target = new Path(fsConfig.getFsUri().toString(), "target"); + fs.mkdirs(source); + fs.mkdirs(target); + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.FS_URIS, source.toString()); + originals.put(FsSourceTaskConfig.POLICY_CLEANUP, AbstractPolicy.Cleanup.MOVE.toString()); + originals.put(FsSourceTaskConfig.POLICY_CLEANUP_MOVE_DIR, target.toString()); + originals.put(FsSourceTaskConfig.POLICY_CLEANUP_MOVE_DIR_PREFIX, "processed_"); + + FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); + try (Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) { + Path tmpDir = new Path(source, String.valueOf(System.nanoTime())); + fs.mkdirs(tmpDir); + String filename = System.nanoTime() + ".txt"; + Path filePath = new Path(tmpDir, filename); + fs.createNewFile(filePath); + + FileMetadata metadata = new FileMetadata(filePath.toString(), 0L, Collections.emptyList()); + Map offset = new HashMap<>(); + offset.put("offset", 1); + offset.put("eof", true); + FileReader reader = policy.offer(metadata, offset); + assertFalse(reader.hasNext()); + + assertFalse(fs.exists(new Path(source, filename))); + assertTrue(fs.exists(new Path(target, "processed_" + filename))); + + metadata = new FileMetadata(System.nanoTime() + ".txt", 0L, Collections.emptyList()); + reader = policy.offer(metadata, offset); + assertFalse(reader.hasNext()); + } + } + + @ParameterizedTest + @MethodSource("fileSystemConfigProvider") + public void oneFilePerFsWithDeleteCleanup(PolicyFsTestConfig fsConfig) throws IOException { + FileSystem fs = fsConfig.getFs(); + + Map originals = fsConfig.getSourceTaskConfig().originalsStrings(); + originals.put(FsSourceTaskConfig.FS_URIS, fsConfig.getFsUri().toString()); + originals.put(FsSourceTaskConfig.POLICY_CLEANUP, AbstractPolicy.Cleanup.DELETE.toString()); + + FsSourceTaskConfig cfg = new FsSourceTaskConfig(originals); + try (Policy policy = ReflectionUtils.makePolicy((Class) fsConfig.getSourceTaskConfig() + .getClass(FsSourceTaskConfig.POLICY_CLASS), cfg)) { + Path tmpDir = new Path(fsConfig.getFsUri().toString(), String.valueOf(System.nanoTime())); + fs.mkdirs(tmpDir); + String filename = System.nanoTime() + ".txt"; + Path filePath = new Path(tmpDir, filename); + fs.createNewFile(filePath); + + FileMetadata metadata = new FileMetadata(filePath.toString(), 0L, Collections.emptyList()); + Map offset = new HashMap<>(); + offset.put("offset", 1); + offset.put("eof", true); + FileReader reader = policy.offer(metadata, offset); + assertFalse(reader.hasNext()); + + assertFalse(fs.exists(new Path(tmpDir, filename))); + + metadata = new FileMetadata(System.nanoTime() + ".txt", 0L, Collections.emptyList()); + reader = policy.offer(metadata, offset); + assertFalse(reader.hasNext()); + } + } + @ParameterizedTest @MethodSource("fileSystemConfigProvider") public void recursiveDirectory(PolicyFsTestConfig fsConfig) throws IOException, InterruptedException { diff --git a/src/test/resources/file/reader/data/cobol/code-pages.cpy b/src/test/resources/file/reader/data/cobol/code-pages.cpy new file mode 100644 index 0000000..dbea4d5 --- /dev/null +++ b/src/test/resources/file/reader/data/cobol/code-pages.cpy @@ -0,0 +1,7 @@ + 01 TRANSDATA. + 05 CURRENCY PIC X(3). + 05 SIGNATURE PIC X(8). + 05 COMPANY-NAME-NP PIC X(15). + 05 COMPANY-ID PIC X(10). + 05 WEALTH-QFY PIC 9(1). + 05 AMOUNT PIC S9(09)V99 BINARY. diff --git a/src/test/resources/file/reader/data/cobol/code-pages.dt b/src/test/resources/file/reader/data/cobol/code-pages.dt new file mode 100644 index 0000000..3e8ca49 Binary files /dev/null and b/src/test/resources/file/reader/data/cobol/code-pages.dt differ diff --git a/src/test/resources/file/reader/data/cobol/companies.cpy b/src/test/resources/file/reader/data/cobol/companies.cpy new file mode 100644 index 0000000..9b627f0 --- /dev/null +++ b/src/test/resources/file/reader/data/cobol/companies.cpy @@ -0,0 +1,16 @@ + 01 COMPANY-DETAILS. + 05 SEGMENT-ID PIC X(5). + 05 COMPANY-ID PIC X(10). + 05 STATIC-DETAILS. + 10 COMPANY-NAME PIC X(15). + 10 ADDRESS PIC X(25). + 10 TAXPAYER. + 15 TAXPAYER-TYPE PIC X(1). + 15 TAXPAYER-STR PIC X(8). + 15 TAXPAYER-NUM REDEFINES TAXPAYER-STR + PIC 9(8) COMP. + 10 STRATEGY. + 15 STRATEGY_DETAIL OCCURS 6. + 25 NUM1 PIC 9(7) COMP. + 25 NUM2 PIC 9(7) COMP-3. + diff --git a/src/test/resources/file/reader/data/cobol/companies.dt b/src/test/resources/file/reader/data/cobol/companies.dt new file mode 100644 index 0000000..ef1397d Binary files /dev/null and b/src/test/resources/file/reader/data/cobol/companies.dt differ diff --git a/src/test/resources/file/reader/data/cobol/type-variety.cpy b/src/test/resources/file/reader/data/cobol/type-variety.cpy new file mode 100644 index 0000000..4a4143f --- /dev/null +++ b/src/test/resources/file/reader/data/cobol/type-variety.cpy @@ -0,0 +1,217 @@ + 01 RECORD. + + 10 ID PIC 9(7) BINARY. + + 10 STRING-VAL PIC X(10). + + 10 NUM-STR-INT01 PIC 9(1). + 10 NUM-STR-INT02 PIC 9(2). + 10 NUM-STR-INT03 PIC 9(3). + 10 NUM-STR-INT04 PIC 9(4). + 10 NUM-STR-INT05 PIC 9(5). + 10 NUM-STR-INT06 PIC 9(8). + 10 NUM-STR-INT07 PIC 9(9). + 10 NUM-STR-INT08 PIC 9(10). + 10 NUM-STR-INT09 PIC 9(11). + 10 NUM-STR-INT10 PIC 9(17). + 10 NUM-STR-INT11 PIC 9(18). + 10 NUM-STR-INT12 PIC 9(19). + 10 NUM-STR-INT13 PIC 9(20). + 10 NUM-STR-INT14 PIC 9(37). + + 10 NUM-STR-SINT02 PIC S9(2). + 10 NUM-STR-SINT03 PIC S9(3). + 10 NUM-STR-SINT04 PIC S9(4). + 10 NUM-STR-SINT05 PIC S9(5). + 10 NUM-STR-SINT06 PIC S9(8). + 10 NUM-STR-SINT07 PIC S9(9). + 10 NUM-STR-SINT08 PIC S9(10). + 10 NUM-STR-SINT09 PIC S9(11). + 10 NUM-STR-SINT10 PIC S9(17). + 10 NUM-STR-SINT11 PIC S9(18). + 10 NUM-STR-SINT12 PIC S9(19). + 10 NUM-STR-SINT13 PIC S9(20). + 10 NUM-STR-SINT14 PIC S9(37). + + 10 NUM-STR-DEC01 PIC 99V9. + 10 NUM-STR-DEC02 PIC 99V99. + 10 NUM-STR-DEC03 PIC 9(3)V99. + 10 NUM-STR-DEC04 PIC 9(4)V9(4). + 10 NUM-STR-DEC05 PIC 9(5)V9(4). + 10 NUM-STR-DEC06 PIC 9(5)V9(5). + 10 NUM-STR-DEC07 PIC 9(15)V99. + 10 NUM-STR-DEC08 PIC 9(16)V99. + 10 NUM-STR-DEC09 PIC 9(17)V99. + 10 NUM-STR-DEC10 PIC 9(18)V9(10). + 10 NUM-STR-SDEC01 PIC S99V9. + 10 NUM-STR-SDEC02 PIC S99V99. + 10 NUM-STR-SDEC03 PIC S9(3)V99. + 10 NUM-STR-SDEC04 PIC S9(4)V9(4). + 10 NUM-STR-SDEC05 PIC S9(5)V9(4). + 10 NUM-STR-SDEC06 PIC S9(5)V9(5). + 10 NUM-STR-SDEC07 PIC S9(15)V99. + 10 NUM-STR-SDEC08 PIC S9(16)V99. + 10 NUM-STR-SDEC09 PIC S9(17)V99. + 10 NUM-STR-SDEC10 PIC S9(18)V9(10). + 10 NUM-STR-EDEC03 PIC S9(3).99. + 10 NUM-STR-EDEC04 PIC S9(4).9(4). + 10 NUM-STR-EDEC05 PIC S9(5).9(4). + 10 NUM-STR-EDEC06 PIC S9(5).9(5). + + 10 NUM-BIN-INT01 PIC 9(1) COMP. + 10 NUM-BIN-INT02 PIC 9(2) COMP. + 10 NUM-BIN-INT03 PIC 9(3) COMP-0. + 10 NUM-BIN-INT04 PIC 9(4) COMP-4. + 10 NUM-BIN-INT05 PIC 9(5) COMP-5. + 10 NUM-BIN-INT06 PIC 9(8) BINARY. + 10 NUM-BIN-INT07 PIC 9(9) BINARY. + 10 NUM-BIN-INT08 PIC 9(10) BINARY. + 10 NUM-BIN-INT09 PIC 9(11) BINARY. + 10 NUM-BIN-INT10 PIC 9(17) BINARY. + 10 NUM-BIN-INT11 PIC 9(18) BINARY. + 10 NUM-BIN-INT12 PIC 9(19) BINARY. + 10 NUM-BIN-INT13 PIC 9(20) BINARY. + 10 NUM-BIN-INT14 PIC 9(37) BINARY. + 10 NUM-SBIN-SINT01 PIC S9(1) COMP. + 10 NUM-SBIN-SINT02 PIC S9(2) COMP. + 10 NUM-SBIN-SINT03 PIC S9(3) COMP. + 10 NUM-SBIN-SINT04 PIC S9(4) COMP. + 10 NUM-SBIN-SINT05 PIC S9(5) COMP. + 10 NUM-SBIN-SINT06 PIC S9(8) BINARY. + 10 NUM-SBIN-SINT07 PIC S9(9) BINARY. + 10 NUM-SBIN-SINT08 PIC S9(10) BINARY. + 10 NUM-SBIN-SINT09 PIC S9(11) BINARY. + 10 NUM-SBIN-SINT10 PIC S9(17) BINARY. + 10 NUM-SBIN-SINT11 PIC S9(18) BINARY. + 10 NUM-SBIN-SINT12 PIC S9(19) BINARY. + 10 NUM-SBIN-SINT13 PIC S9(20) BINARY. + 10 NUM-SBIN-SINT14 PIC S9(37) BINARY. + + 10 NUM-BIN-DEC01 PIC 99V9 COMP. + 10 NUM-BIN-DEC02 PIC 99V99 COMP. + 10 NUM-BIN-DEC03 PIC 9(3)V99 COMP. + 10 NUM-BIN-DEC04 PIC 9(4)V9(4) COMP. + 10 NUM-BIN-DEC05 PIC 9(5)V9(4) COMP. + 10 NUM-BIN-DEC06 PIC 9(5)V9(5) COMP. + 10 NUM-BIN-DEC07 PIC 9(15)V99 COMP. + 10 NUM-BIN-DEC08 PIC 9(16)V99 COMP. + 10 NUM-BIN-DEC09 PIC 9(17)V99 COMP. + 10 NUM-BIN-DEC10 PIC 9(18)V9(10) COMP. + 10 NUM-SBIN-DEC01 PIC S99V9 COMP. + 10 NUM-SBIN-DEC02 PIC S99V99 COMP. + 10 NUM-SBIN-DEC03 PIC S9(3)V99 COMP. + 10 NUM-SBIN-DEC04 PIC S9(4)V9(4) COMP. + 10 NUM-SBIN-DEC05 PIC S9(5)V9(4) COMP. + 10 NUM-SBIN-DEC06 PIC S9(5)V9(5) COMP. + 10 NUM-SBIN-DEC07 PIC S9(15)V99 COMP. + 10 NUM-SBIN-DEC08 PIC S9(16)V99 COMP. + 10 NUM-SBIN-DEC09 PIC S9(17)V99 COMP. + 10 NUM-SBIN-DEC10 PIC S9(18)V9(10) COMP. + + 10 NUM-BCD-INT01 PIC 9(1) COMP-3. + 10 NUM-BCD-INT02 PIC 9(2) COMP-3. + 10 NUM-BCD-INT03 PIC 9(3) COMP-3. + 10 NUM-BCD-INT04 PIC 9(4) COMP-3. + 10 NUM-BCD-INT05 PIC 9(5) COMP-3. + 10 NUM-BCD-INT06 PIC 9(8) COMP-3. + 10 NUM-BCD-INT07 PIC 9(9) COMP-3. + 10 NUM-BCD-INT08 PIC 9(10) COMP-3. + 10 NUM-BCD-INT09 PIC 9(11) COMP-3. + 10 NUM-BCD-INT10 PIC 9(17) COMP-3. + 10 NUM-BCD-INT11 PIC 9(18) COMP-3. + 10 NUM-BCD-INT12 PIC 9(19) COMP-3. + 10 NUM-BCD-INT13 PIC 9(20) COMP-3. + 10 NUM-BCD-INT14 PIC 9(37) COMP-3. + + 10 NUM-BCD-SINT01 PIC S9(1) COMP-3. + 10 NUM-BCD-SINT02 PIC S9(2) COMP-3. + 10 NUM-BCD-SINT03 PIC S9(3) COMP-3. + 10 NUM-BCD-SINT04 PIC S9(4) COMP-3. + 10 NUM-BCD-SINT05 PIC S9(5) COMP-3. + 10 NUM-BCD-SINT06 PIC S9(8) COMP-3. + 10 NUM-BCD-SINT07 PIC S9(9) COMP-3. + 10 NUM-BCD-SINT08 PIC S9(10) COMP-3. + 10 NUM-BCD-SINT09 PIC S9(11) COMP-3. + 10 NUM-BCD-SINT10 PIC S9(17) COMP-3. + 10 NUM-BCD-SINT11 PIC S9(18) COMP-3. + 10 NUM-BCD-SINT12 PIC S9(19) COMP-3. + 10 NUM-BCD-SINT13 PIC S9(20) COMP-3. + 10 NUM-BCD-SINT14 PIC S9(37) COMP-3. + + 10 NUM-BCD-DEC01 PIC 99V9 COMP-3. + 10 NUM-BCD-DEC02 PIC 99V99 COMP-3. + 10 NUM-BCD-DEC03 PIC 9(3)V99 COMP-3. + 10 NUM-BCD-DEC04 PIC 9(4)V9(4) COMP-3. + 10 NUM-BCD-DEC05 PIC 9(5)V9(4) COMP-3. + 10 NUM-BCD-DEC06 PIC 9(5)V9(5) COMP-3. + 10 NUM-BCD-DEC07 PIC 9(15)V99 COMP-3. + 10 NUM-BCD-DEC08 PIC 9(16)V99 COMP-3. + 10 NUM-BCD-DEC09 PIC 9(17)V99 COMP-3. + 10 NUM-BCD-DEC10 PIC 9(18)V9(10) COMP-3. + 10 NUM-BCD-SDEC01 PIC S99V9 COMP-3. + 10 NUM-BCD-SDEC02 PIC S99V99 COMP-3. + 10 NUM-BCD-SDEC03 PIC S9(3)V99 COMP-3. + 10 NUM-BCD-SDEC04 PIC S9(4)V9(4) COMP-3. + 10 NUM-BCD-SDEC05 PIC S9(5)V9(4) COMP-3. + 10 NUM-BCD-SDEC06 PIC S9(5)V9(5) COMP-3. + 10 NUM-BCD-SDEC07 PIC S9(15)V99 COMP-3. + 10 NUM-BCD-SDEC08 PIC S9(16)V99 COMP-3. + 10 NUM-BCD-SDEC09 PIC S9(17)V99 COMP-3. + 10 NUM-BCD-SDEC10 PIC S9(18)V9(10) COMP-3. + + 10 NUM-SL-STR-INT01 PIC S9(9) SIGN IS + LEADING SEPARATE. + 10 NUM-SL-STR-DEC01 PIC 99V99 SIGN IS + LEADING SEPARATE CHARACTER. + 10 NUM-ST-STR-INT01 PIC S9(9) SIGN IS + TRAILING SEPARATE. + 10 NUM-ST-STR-DEC01 PIC 99V99 SIGN + TRAILING SEPARATE. + 10 NUM-SLI-STR-DEC01 PIC SV9(7) SIGN LEADING. + 10 NUM-STI-STR-DEC01 PIC SV9(7) SIGN TRAILING. + 10 NUM-SLI-DEBUG PIC X(7). + 10 NUM-STI-DEBUG PIC X(7). + + 10 FLOAT-01 COMP-1. + 10 DOUBLE-01 COMP-2. + + 10 COMMON-8-BIN PIC 9(8) BINARY. + 10 COMMON-S3-BIN PIC S9(3) BINARY. + 10 COMMON-S94COMP PIC S9(04) COMP. + 10 COMMON-S8-BIN PIC S9(8) BINARY. + 10 COMMON-DDC97-BIN PIC S9V9(7) BINARY. + 10 COMMON-97COMP3 PIC 9(07) COMP-3. + 10 COMMON-915COMP3 PIC 9(15) COMP-3. + 10 COMMON-S95COMP3 PIC S9(5) COMP-3. + 10 COMMON-S999DCCOMP3 PIC S9(09)V99 COMP-3. + 10 COMMON-S913COMP3 PIC S9(13) COMP-3. + 10 COMMON-S913DCCOMP3 PIC S9(13)V99 COMP-3. + 10 COMMON-S911DCC2 PIC S9(11)V99 COMP-3. + 10 COMMON-S910DCC3 PIC S9(10)V999 COMP-3. + 10 COMMON-S03DDC PIC SV9(5) COMP-3. + 10 COMMON-U03DDC PIC V9(5) COMP-3. + + 10 COMMON-UPC5DDC PIC PPP9(5) COMP-3. + 10 COMMON-SPC5DDC PIC SPP99999 COMP-3. + 10 COMMON-UPI5DDC PIC 9(5)PPP COMP-3. + 10 COMMON-SPI5DDC PIC S99999PPP COMP-3. + + 10 COMMON-UPC5DISP PIC SPPP9(5). + 10 COMMON-UPI5DISP PIC S9(5)PPP. + + 10 COMMON-UPC1BIN PIC SPPP9 COMP. + 10 COMMON-UPI1BIN PIC S9PPP COMP. + 10 COMMON-UPC3BIN PIC SPPP9(3) COMP. + 10 COMMON-UPI3BIN PIC S9(3)PPP COMP. + 10 COMMON-UPC5BIN PIC SPPP9(5) COMP. + 10 COMMON-UPI5BIN PIC S9(5)PPP COMP. + 10 COMMON-UPC10BIN PIC SPPP9(10) COMP. + 10 COMMON-UPI10BIN PIC S9(10)PPP COMP. + + 10 EX-NUM-INT01 PIC +9(8). + 10 EX-NUM-INT02 PIC 9(8)+. + 10 EX-NUM-INT03 PIC -9(8). + 10 EX-NUM-INT04 PIC Z(8)-. + 10 EX-NUM-DEC01 PIC +9(6)V99. + 10 EX-NUM-DEC02 PIC Z(6)VZZ-. + 10 EX-NUM-DEC03 PIC 9(6).99-. diff --git a/src/test/resources/file/reader/data/cobol/type-variety.dt b/src/test/resources/file/reader/data/cobol/type-variety.dt new file mode 100644 index 0000000..04cd822 Binary files /dev/null and b/src/test/resources/file/reader/data/cobol/type-variety.dt differ