Conduit connector for MySQL.
Run make build
to build the connector.
Run make test
to run all the unit tests. Run make test-integration
to run the integration tests.
The Docker compose file at test/docker-compose.yml
can be used to run the required resource locally. It includes adminer for database management.
Use the TRACE=true
environment variable to enable trace logs when running tests.
A source connector pulls data from an external resource and pushes it to downstream resources via Conduit.
Snapshot mode is the first stage of the source sync process. It reads all rows from the configured tables as record snapshots.
In snapshot mode, the record payload consists of opencdc.StructuredData, with each key being a column and each value being that column's value.
When the connector switches to CDC mode, it starts streaming changes from the obtained position at the start of the snapshot. It uses the row-based binlog format to capture detailed changes at the individual row level.
By default, the connector will error out if it finds a table that has no primary key and no specified sorting column specified, as we can't guarantee that the snapshot will be consistent. Table changes during the snapshot will be however captured by CDC mode.
As of writing, the unsafe snapshot is implemented using batches with LIMIT
and OFFSET
, so expect it to be slow for large tables. You can optimize the snapshot by specifying a sorting column.
The position of the table is currently not recorded, so the unsafe snapshot will restart from zero every time.
name | description | required | default | example |
---|---|---|---|---|
dsn |
The data source name for the MySQL database. | true | <user>:<password>@tcp(127.0.0.1:3306)/<db> |
|
tables |
The list of tables to pull data from | true | users,posts,admins |
|
tableConfig.<table name>.sortingColum (*) |
The custom column to use to sort the rows during the snapshot. Use this if there are any tables which don't have a proper autoincrementing primary key. | false | tableConfig.users.sortingColumn as the key, id as the value |
|
fetchSize |
Limits how many rows should be retrieved on each database fetch. | false | 50000 | 50000 |
unsafeSnapshot |
Allows tables that don't have either a primary key or are specified in tableConfig.*.sortingColumn. | false |
(*): you can use any ordinal mysql datatype.
The connector is tested against MySQL v8.0. Compatibility with older versions isn't guaranteed.
- Binary Log (binlog) must be enabled.
- Binlog format must be set to ROW.
- Binlog row image must be set to FULL.
- Tables must have sortable primary keys.
For Snapshot and CDC modes, the following privileges are required:
- SELECT
- LOCK TABLES
- RELOAD
- REPLICATION CLIENT
- REPLICATION SLAVE
The MySQL destination takes a opencdc.Record
and parses it into a valid SQL query. Each record is individually parsed and upserted. Writing in batches is planned to be implemented, which should greatly improve performance over the current implementation.
If the target table contains a column with a unique constraint (this includes PRIMARY KEY and UNIQUE indexes), records will be upserted. Otherwise, they will be appended. Support for updating tables without unique constraints is tracked here.
If the target table already contains a record with the same key, the Destination will upsert with its current received values. Because Keys must be unique, this can overwrite and thus potentially lose data, so keys should be assigned correctly from the Source.
If there is no key, the record will be simply appended.
(Planned to do). You can upvote the following issue to add more interest on getting this feature implemented sooner.
name | description | required | example |
---|---|---|---|
dsn |
The data source name for the MySQL database. | true | <user>:<password>@tcp(127.0.0.1:3306)/<db> |
table |
The target table to write the record to | true | users |
key |
Key represents the column name to use to delete records. | true | user_id |