Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/kafka-connect): support MongoSourceConnector #6416

Merged
merged 20 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase):
)
generic_connectors: List[GenericConnectorConfig] = Field(
default=[],
description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector or Debezium Source Connector",
description="Provide lineage graph for sources connectors other than Confluent JDBC Source Connector, Debezium Source Connector, and Mongo Source Connector",
)


Expand Down Expand Up @@ -556,6 +556,68 @@ def _extract_lineages(self):
return


@dataclass
class MongoSourceConnector:
# https://www.mongodb.com/docs/kafka-connector/current/source-connector/

connector_manifest: ConnectorManifest

def __init__(
self, connector_manifest: ConnectorManifest, config: KafkaConnectSourceConfig
) -> None:
self.connector_manifest = connector_manifest
self.config = config
self._extract_lineages()

@dataclass
class MongoSourceParser:
db_connection_url: Optional[str]
source_platform: str
database_name: Optional[str]
topic_prefix: Optional[str]
transforms: List[str]

def get_parser(
self,
connector_manifest: ConnectorManifest,
) -> MongoSourceParser:
parser = self.MongoSourceParser(
db_connection_url=connector_manifest.config.get("connection.uri"),
source_platform="mongodb",
database_name=connector_manifest.config.get("database"),
topic_prefix=connector_manifest.config.get("topic_prefix"),
transforms=connector_manifest.config["transforms"].split(",")
if "transforms" in connector_manifest.config
else [],
)

return parser

def _extract_lineages(self):
lineages: List[KafkaConnectLineage] = list()
parser = self.get_parser(self.connector_manifest)
source_platform = parser.source_platform
topic_naming_pattern = r"mongodb\.(\w+)\.(\w+)"

if not self.connector_manifest.topic_names:
return lineages

for topic in self.connector_manifest.topic_names:
found = re.search(re.compile(topic_naming_pattern), topic)

if found:
table_name = get_dataset_name(found.group(1), None, found.group(2))

lineage = KafkaConnectLineage(
source_dataset=table_name,
source_platform=source_platform,
target_dataset=topic,
target_platform="kafka",
)
lineages.append(lineage)
self.connector_manifest.lineages = lineages


@dataclass
class DebeziumSourceConnector:
connector_manifest: ConnectorManifest
Expand Down Expand Up @@ -846,7 +908,7 @@ class KafkaConnectSource(Source):
Current limitations:

- works only for
- JDBC and Debezium source connectors
- JDBC, Debezium, and Mongo source connectors
- Generic connectors with user-defined lineage graph
- BigQuery sink connector
"""
Expand Down Expand Up @@ -941,6 +1003,13 @@ def get_connectors_manifest(self) -> List[ConnectorManifest]:
connector_manifest = DebeziumSourceConnector(
connector_manifest=connector_manifest, config=self.config
).connector_manifest
elif (
connector_manifest.config.get("connector.class", "")
== "com.mongodb.kafka.connect.MongoSourceConnector"
):
connector_manifest = MongoSourceConnector(
connector_manifest=connector_manifest, config=self.config
).connector_manifest
else:
# Find the target connector object in the list, or log an error if unknown.
target_connector = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
- zookeeper
- broker
- mysqldb
- mongo
ports:
- "58083:58083"
# volumes:
Expand All @@ -34,6 +35,8 @@ services:
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
--strip-components=1 mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar
Expand Down Expand Up @@ -71,5 +74,21 @@ services:
ports:
- "5432:5432"

mongo:
hostname: mongo
image: mongo:4.2.9
container_name: "test_mongo"
ports:
- "27017:27017"
command: --replSet rs0
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin
- MONGO_INITDB_DATABASE=test_db
- MONGO_INITDB_USERNAME=kafka-connector
- MONGO_INITDB_PASSWORD=password
volumes:
- ./../kafka-connect/setup/conf/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro

volumes:
test_zkdata:
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector\", \"description\": \"Source connector using `com.mongodb.kafka.connect.MongoSourceConnector` plugin.\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector:test_db.purchases\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:kafka\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:mongodb\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS
conn = new Mongo();
db = conn.getDB("test_db");
db.purchases.insertOne({ _id: 3, item: "lamp post", price: 12 });
db.purchases.insertOne({ _id: 4, item: "lamp post", price: 13 });
EOJS


{
frsann marked this conversation as resolved.
Show resolved Hide resolved
sleep 3 &&
mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS
var rootUser = '$MONGO_INITDB_ROOT_USERNAME';
var rootPassword = '$MONGO_INITDB_ROOT_PASSWORD';
var admin = db.getSiblingDB('admin');
admin.auth(rootUser, rootPassword);
EOJS
} &



Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
).status_code
== 200,
)

# Creating MySQL source with no transformations , only topic prefix
r = requests.post(
"http://localhost:58083/connectors",
Expand Down Expand Up @@ -252,3 +253,88 @@ def test_kafka_connect_ingest(docker_compose_runner, pytestconfig, tmp_path, moc
golden_path=test_resources_dir / "kafka_connect_mces_golden.json",
ignore_paths=[],
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_mongosourceconnect_ingest(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shirshanka I separated the mongo source connector test from the original. The test passed locally, but for some reason it fails in CI. It's bit hard to debug, but either the collection in Mongo does not get created as it should, or there is some race condition that causes the connector to not be ready when the ingestion is run.

What do you think we should do? Skip the entire test? Comment out the aspects related to the topics and only test the existence of the connector?

docker_compose_runner, pytestconfig, tmp_path, mock_time
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect"
test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"

# Share Compose configurations between files and projects
# https://docs.docker.com/compose/extends/
docker_compose_file = [
str(test_resources_dir_kafka / "docker-compose.yml"),
str(test_resources_dir / "docker-compose.override.yml"),
]
with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
time.sleep(10)
# Run the setup.sql file to populate the database.
command = 'docker exec test_mongo mongo admin -u admin -p admin --eval "rs.initiate();"'
ret = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
assert ret.returncode == 0
time.sleep(10)

wait_for_port(docker_services, "test_broker", 59092, timeout=120)
wait_for_port(docker_services, "test_connect", 58083, timeout=120)
docker_services.wait_until_responsive(
timeout=30,
pause=1,
check=lambda: requests.get(
"http://localhost:58083/connectors",
).status_code
== 200,
)

# Creating MongoDB source
r = requests.post(
"http://localhost:58083/connectors",
headers={"Content-Type": "application/json"},
data=r"""{
"name": "source_mongodb_connector",
"config": {
"tasks.max": "1",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://admin:admin@test_mongo:27017",
"topic.prefix": "mongodb",
"database": "test_db",
"collection": "purchases",
"copy.existing": true,
"copy.existing.namespace.regex": "test_db.purchases",
"change.stream.full.document": "updateLookup",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "-1",
"topic.creation.default.partitions": "-1",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"output.format.key": "schema",
"output.format.value": "json",
"output.schema.infer.value": false,
"publish.full.document.only":true
}
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created

# Give time for connectors to process the table data
frsann marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(60)

# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)

# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_mongo_mces_golden.json",
ignore_paths=[],
)