Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): support alternate hashing algorithm for doc id #10423

Merged
merged 52 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
151c34f
md5 to sha256
pinakipb2 May 3, 2024
bdb04e6
add scope
pinakipb2 May 3, 2024
19ca916
Add component
pinakipb2 May 3, 2024
8ce0ab7
Adding PropertySource
pinakipb2 May 3, 2024
a3460bf
Adding component
pinakipb2 May 3, 2024
7af93c5
Use env vars for test
pinakipb2 May 3, 2024
d127d8c
lint
pinakipb2 May 3, 2024
b320b22
Updates
pinakipb2 May 3, 2024
d980b52
Inject env var
pinakipb2 May 3, 2024
b67593a
updates
pinakipb2 May 3, 2024
30a8781
Update test
pinakipb2 May 3, 2024
6e7f128
add env
pinakipb2 May 3, 2024
8a9ba47
inject env
pinakipb2 May 3, 2024
f598f3f
Updates
pinakipb2 May 3, 2024
ec66c70
Add env var to xml
pinakipb2 May 3, 2024
cfb96eb
Update test
pinakipb2 May 4, 2024
be75dbe
Inject env
pinakipb2 May 4, 2024
f5aa13f
change scope
pinakipb2 May 4, 2024
31f8466
Update scope
pinakipb2 May 4, 2024
00ca5c0
Public
pinakipb2 May 4, 2024
26fe2af
Remove PropertySource
pinakipb2 May 6, 2024
7942b21
Remove parameter
pinakipb2 May 6, 2024
58ef4b0
Updates
pinakipb2 May 6, 2024
50b80ad
Merge branch 'master' into pb-md5-vulnerability
david-leifker Jun 26, 2024
794d9f5
Merge branch 'datahub-project:master' into pb-md5-vulnerability
pinakipb2 Jun 27, 2024
183f29a
Update TimeseriesAspectTransformer.java
pinakipb2 Jun 27, 2024
9a90977
Update Edge.java
pinakipb2 Jun 27, 2024
27c7e55
Update docker.env
pinakipb2 Jun 27, 2024
0da6a69
Update docker-without-neo4j.env
pinakipb2 Jun 27, 2024
3d08df5
Merge branch 'master' into pb-md5-vulnerability
jjoyce0510 Jun 28, 2024
c3054f2
Merge branch 'master' into pb-md5-vulnerability
jjoyce0510 Jun 28, 2024
e1b8a6a
Update run-quickstart.sh
pinakipb2 Jul 2, 2024
f6cf681
Merge branch 'master' into pb-md5-vulnerability
david-leifker Jul 31, 2024
e3a1a16
Update ElasticSearchSystemMetadataService.java
pinakipb2 Jul 31, 2024
09353e1
Merge branch 'master' into pb-md5-vulnerability
david-leifker Jul 31, 2024
c2f7992
Merge branch 'master' into pb-md5-vulnerability
david-leifker Jul 31, 2024
d084f22
Merge branch 'master' into pb-md5-vulnerability
david-leifker Jul 31, 2024
36de029
Merge branch 'master' into pb-md5-vulnerability
david-leifker Aug 3, 2024
bb62edb
Update yaml files
pinakipb2 Aug 3, 2024
1d4aeef
Fetch env var
pinakipb2 Aug 3, 2024
d81d314
export env var
pinakipb2 Aug 3, 2024
cb10f26
Add env in gradle file
pinakipb2 Aug 3, 2024
6a4b10d
Update cypress test env
pinakipb2 Aug 3, 2024
9d17ba2
Merge branch 'master' into pb-md5-vulnerability
pinakipb2 Aug 4, 2024
6d83634
Add env
pinakipb2 Aug 4, 2024
83535dc
Update docker compose generates
pinakipb2 Aug 6, 2024
1a815ee
Merge remote-tracking branch 'origin/master' into pb-md5-vulnerability
pinakipb2 Aug 6, 2024
b425468
Update docker compose generates - consumers
pinakipb2 Aug 6, 2024
7870ef1
Update consumers generated
pinakipb2 Aug 6, 2024
95c9d44
Remove unwanted code
pinakipb2 Aug 6, 2024
7dda052
Updates
pinakipb2 Aug 6, 2024
e2f0be7
Merge remote-tracking branch 'origin/master' into pb-md5-vulnerability
pinakipb2 Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ PE_CONSUMER_ENABLED=true
UI_INGESTION_ENABLED=true
ENTITY_SERVICE_ENABLE_RETENTION=true

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

Expand Down
2 changes: 2 additions & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ MCE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=true
UI_INGESTION_ENABLED=true

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to enable Metadata Service Authentication
METADATA_SERVICE_AUTH_ENABLED=false

Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
- ELASTIC_ID_HASH_ALGO=MD5
healthcheck:
interval: 1s
retries: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
- ELASTIC_ID_HASH_ALGO=MD5
healthcheck:
interval: 1s
retries: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
- ELASTIC_ID_HASH_ALGO=MD5
healthcheck:
interval: 1s
retries: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ services:
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_IMPL=elasticsearch
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
- ELASTIC_ID_HASH_ALGO=MD5
hostname: datahub-mae-consumer
image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head}
ports:
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ services:
- NEO4J_PASSWORD=datahub
- GRAPH_SERVICE_IMPL=neo4j
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
- ELASTIC_ID_HASH_ALGO=MD5
hostname: datahub-mae-consumer
image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head}
ports:
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ services:
- KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false
- KAFKA_MESSAGE_MAX_BYTES=5242880
- KAFKA_MAX_MESSAGE_BYTES=5242880
- ELASTIC_ID_HASH_ALGO=MD5
healthcheck:
interval: 1s
retries: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ public String toDocId() {
}

try {
String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO");
byte[] bytesOfRawDocID = rawDocId.toString().getBytes(StandardCharsets.UTF_8);
MessageDigest md = MessageDigest.getInstance("MD5");
MessageDigest md = MessageDigest.getInstance(hashAlgo);
byte[] thedigest = md.digest(bytesOfRawDocID);
return Base64.getEncoder().encodeToString(thedigest);
} catch (NoSuchAlgorithmException e) {
Expand Down
1 change: 1 addition & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ test {
// override, testng controlling parallelization
// increasing >1 will merely run all tests extra times
maxParallelForks = 1
environment "ELASTIC_ID_HASH_ALGO", "MD5"
}
useTestNG() {
suites 'src/test/resources/testng.xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.run.AspectRowSummary;
Expand Down Expand Up @@ -42,6 +43,7 @@
import org.opensearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.opensearch.search.aggregations.bucket.terms.Terms;
import org.opensearch.search.aggregations.metrics.ParsedMax;
import org.springframework.beans.factory.annotation.Value;

@Slf4j
@RequiredArgsConstructor
Expand Down Expand Up @@ -71,6 +73,14 @@ public class ElasticSearchSystemMetadataService
FIELD_REGISTRY_NAME,
FIELD_REGISTRY_VERSION));

@Value("${elasticsearch.idHashAlgo}")
private String hashAlgo;

@VisibleForTesting
public void setIdHashAlgo(String algo) {
hashAlgo = algo;
}

private String toDocument(SystemMetadata systemMetadata, String urn, String aspect) {
final ObjectNode document = JsonNodeFactory.instance.objectNode();

Expand All @@ -86,10 +96,10 @@ private String toDocument(SystemMetadata systemMetadata, String urn, String aspe

private String toDocId(@Nonnull final String urn, @Nonnull final String aspect) {
String rawDocId = urn + DOC_DELIMETER + aspect;

String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO");
try {
byte[] bytesOfRawDocID = rawDocId.getBytes(StandardCharsets.UTF_8);
MessageDigest md = MessageDigest.getInstance("MD5");
MessageDigest md = MessageDigest.getInstance(hashAlgo);
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved
byte[] thedigest = md.digest(bytesOfRawDocID);
return Base64.getEncoder().encodeToString(thedigest);
} catch (NoSuchAlgorithmException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.schema.ArrayDataSchema;
Expand All @@ -32,6 +33,7 @@
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Value;

/** Class that provides a utility function that transforms the timeseries aspect into a document */
@Slf4j
Expand All @@ -48,7 +50,15 @@ public class TimeseriesAspectTransformer {
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
}

private TimeseriesAspectTransformer() {}
public TimeseriesAspectTransformer() {}

@Value("${elasticsearch.idHashAlgo}")
private static String hashAlgo;

@VisibleForTesting
public void setIdHashAlgo(String algo) {
hashAlgo = algo;
}

public static Map<String, JsonNode> transform(
@Nonnull final Urn urn,
Expand Down Expand Up @@ -257,7 +267,9 @@ private static Pair<String, ObjectNode> getTimeseriesFieldCollectionDocument(
finalDocument);
}

private static String getDocId(@Nonnull JsonNode document, String collectionId) {
private static String getDocId(@Nonnull JsonNode document, String collectionId)
throws IllegalArgumentException {
String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO");
String docId = document.get(MappingsBuilder.TIMESTAMP_MILLIS_FIELD).toString();
JsonNode eventGranularity = document.get(MappingsBuilder.EVENT_GRANULARITY);
if (eventGranularity != null) {
Expand All @@ -276,6 +288,11 @@ private static String getDocId(@Nonnull JsonNode document, String collectionId)
docId += partitionSpec.toString();
}

return DigestUtils.md5Hex(docId);
if (hashAlgo.equalsIgnoreCase("SHA-256")) {
return DigestUtils.sha256Hex(docId);
} else if (hashAlgo.equalsIgnoreCase("MD5")) {
return DigestUtils.md5Hex(docId);
}
throw new IllegalArgumentException("Hash function not handled !");
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public abstract class SystemMetadataServiceTestBase extends AbstractTestNGSpring
@BeforeClass
public void setup() {
_client = buildService();
_client.setIdHashAlgo("MD5");
_client.configure();
_client.reindexAll(Collections.emptySet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,10 @@ private ElasticSearchTimeseriesAspectService buildService() {
*/

private void upsertDocument(TestEntityProfile dp, Urn urn) throws JsonProcessingException {
TimeseriesAspectTransformer timeseriesAspectTransformer = new TimeseriesAspectTransformer();
timeseriesAspectTransformer.setIdHashAlgo("MD5");
Map<String, JsonNode> documents =
TimeseriesAspectTransformer.transform(urn, dp, aspectSpec, null);
timeseriesAspectTransformer.transform(urn, dp, aspectSpec, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Ensure the idHashAlgo configuration is utilized in the TimeseriesAspectTransformer class.

The idHashAlgo field is set using the @Value annotation, but there is no evidence that it is being used in the transformation logic. This could lead to the hash algorithm configuration not being applied as intended.

  • metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java: Ensure the idHashAlgo field is used in the transformation methods.
Analysis chain

LGTM! But verify the usage of the new configuration.

The changes to set the hash algorithm to "MD5" in the upsertDocument method are correct.

However, ensure that the new configuration is correctly utilized throughout the codebase.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage of the new hash algorithm configuration in the codebase.

# Test: Search for the usage of the new hash algorithm configuration. Expect: Correct utilization of the new configuration.
rg --type java $'setIdHashAlgo'

Length of output: 612



Script:

#!/bin/bash
# Search for the usage of the idHashAlgo field or variable within the TimeseriesAspectTransformer class.
ast-grep --lang java --pattern $'class TimeseriesAspectTransformer {
  $$$
  private String idHashAlgo;
  $$$
}' | xargs -I {} rg 'idHashAlgo' {}

Length of output: 148



Script:

#!/bin/bash
# List all methods and fields within the TimeseriesAspectTransformer class to verify the presence and usage of idHashAlgo.
ast-grep --lang java --pattern $'class TimeseriesAspectTransformer {
  $$$
}' | xargs -I {} rg -A 20 'class TimeseriesAspectTransformer' '{}'

Length of output: 144



Script:

#!/bin/bash
# Search for all instances of idHashAlgo in the entire repository to verify its usage.
rg 'idHashAlgo'

Length of output: 411



Script:

#!/bin/bash
# Search for the usage of the idHashAlgo field within the TimeseriesAspectTransformer class to confirm it is being used in the transformation logic.
rg 'idHashAlgo' metadata-io/src/main/java/com/linkedin/metadata/timeseries/transformer/TimeseriesAspectTransformer.java

Length of output: 159

assertEquals(documents.size(), 3);
documents.forEach(
(key, value) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ elasticsearch:
opensearchUseAwsIamAuth: ${OPENSEARCH_USE_AWS_IAM_AUTH:false}
region: ${AWS_REGION:#{null}}
implementation: ${ELASTICSEARCH_IMPLEMENTATION:elasticsearch} # elasticsearch or opensearch, for handling divergent cases
idHashAlgo: ${ELASTIC_ID_HASH_ALGO:MD5}
sslContext: # Required if useSSL is true
protocol: ${ELASTICSEARCH_SSL_PROTOCOL:#{null}}
secureRandomImplementation: ${ELASTICSEARCH_SSL_SECURE_RANDOM_IMPL:#{null}}
Expand Down
6 changes: 6 additions & 0 deletions smoke-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ task noCypressSuite0(type: Exec, dependsOn: [installDev, ':metadata-ingestion:in
environment 'DATAHUB_KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8080/schema-registry/api/'
environment 'KAFKA_BROKER_CONTAINER', 'datahub-kafka-broker-1'
environment 'TEST_STRATEGY', 'no_cypress_suite0'
environment "ELASTIC_ID_HASH_ALGO", "MD5"
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved

workingDir = project.projectDir
commandLine 'bash', '-c',
Expand All @@ -101,6 +102,7 @@ task noCypressSuite1(type: Exec, dependsOn: [installDev, ':metadata-ingestion:in
environment 'DATAHUB_KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8080/schema-registry/api/'
environment 'KAFKA_BROKER_CONTAINER', 'datahub-kafka-broker-1'
environment 'TEST_STRATEGY', 'no_cypress_suite1'
environment "ELASTIC_ID_HASH_ALGO", "MD5"
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved

workingDir = project.projectDir
commandLine 'bash', '-c',
Expand All @@ -113,6 +115,7 @@ task cypressSuite1(type: Exec, dependsOn: [installDev, ':metadata-ingestion:inst
environment 'DATAHUB_KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8080/schema-registry/api/'
environment 'KAFKA_BROKER_CONTAINER', 'datahub-kafka-broker-1'
environment 'TEST_STRATEGY', 'cypress_suite1'
environment "ELASTIC_ID_HASH_ALGO", "MD5"
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved

workingDir = project.projectDir
commandLine 'bash', '-c',
Expand All @@ -125,6 +128,7 @@ task cypressRest(type: Exec, dependsOn: [installDev, ':metadata-ingestion:instal
environment 'DATAHUB_KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8080/schema-registry/api/'
environment 'KAFKA_BROKER_CONTAINER', 'datahub-kafka-broker-1'
environment 'TEST_STRATEGY', 'cypress_rest'
environment "ELASTIC_ID_HASH_ALGO", "MD5"
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved

workingDir = project.projectDir
commandLine 'bash', '-c',
Expand All @@ -139,6 +143,7 @@ task cypressDev(type: Exec, dependsOn: [installDev, ':metadata-ingestion:install
environment 'RUN_QUICKSTART', 'false'
environment 'DATAHUB_KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8080/schema-registry/api/'
environment 'KAFKA_BROKER_CONTAINER', 'datahub-kafka-broker-1'
environment "ELASTIC_ID_HASH_ALGO", "MD5"
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved

workingDir = project.projectDir
commandLine 'bash', '-c',
Expand All @@ -154,6 +159,7 @@ task cypressData(type: Exec, dependsOn: [installDev, ':metadata-ingestion:instal
environment 'DATAHUB_KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8080/schema-registry/api/'
environment 'KAFKA_BROKER_CONTAINER', 'datahub-kafka-broker-1'
environment 'RUN_UI', 'false'
environment "ELASTIC_ID_HASH_ALGO", "MD5"
pinakipb2 marked this conversation as resolved.
Show resolved Hide resolved

workingDir = project.projectDir
commandLine 'bash', '-c',
Expand Down
1 change: 1 addition & 0 deletions smoke-test/cypress-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ source venv/bin/activate

export KAFKA_BROKER_CONTAINER="datahub-kafka-broker-1"
export KAFKA_BOOTSTRAP_SERVER="broker:9092"
export ELASTIC_ID_HASH_ALGO="MD5"
python -c 'from tests.cypress.integration_test import ingest_data; ingest_data()'

cd tests/cypress
Expand Down
1 change: 1 addition & 0 deletions smoke-test/run-quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ DATAHUB_SEARCH_TAG="${DATAHUB_SEARCH_TAG:=2.9.0}"
XPACK_SECURITY_ENABLED="${XPACK_SECURITY_ENABLED:=plugins.security.disabled=true}"
ELASTICSEARCH_USE_SSL="${ELASTICSEARCH_USE_SSL:=false}"
USE_AWS_ELASTICSEARCH="${USE_AWS_ELASTICSEARCH:=true}"
ELASTIC_ID_HASH_ALGO="${ELASTIC_ID_HASH_ALGO:=MD5}"

echo "DATAHUB_VERSION = $DATAHUB_VERSION"
DATAHUB_TELEMETRY_ENABLED=false \
Expand Down
1 change: 1 addition & 0 deletions smoke-test/smoke.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ source venv/bin/activate
source ./set-cypress-creds.sh

export DATAHUB_GMS_URL=http://localhost:8080
export ELASTIC_ID_HASH_ALGO="MD5"

# no_cypress_suite0, no_cypress_suite1, cypress_suite1, cypress_rest
if [[ -z "${TEST_STRATEGY}" ]]; then
Expand Down
Loading