-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(restli-mce-consumer) (#6744)
* fix(security): commons-text in frontend * refactor(restli): set threads based on cpu cores feat(mce-consumers): hit local restli endpoint * testing docker build * Add retry configuration options for entity client * Kafka debugging * fix(kafka-setup): parallelize topic creation * Adjust docker build * Docker build updates * WIP * fix(lint): metadata-ingestion lint * fix(gradle-docker): fix docker frontend dep * fix(elastic): fix race condition between gms and mae for index creation * Revert "fix(elastic): fix race condition between gms and mae for index creation" This reverts commit 9629d12. * fix(test): fix datahub frontend test for clean/test cycle * fix(test): datahub-frontend missing assets in test * fix(security): set protobuf lib datahub-upgrade & mce/mae-consumer * gitingore update * fix(docker): remove platform on docker base image, set by buildx * refactor(kafka-producer): update kafka producer tracking/logging * updates per PR feedback * Add documentation around mce standalone consumer Kafka consumer concurrency to follow thread count for restli & sql connection pool Co-authored-by: leifker <[email protected]> Co-authored-by: Pedro Silva <[email protected]>
- Loading branch information
1 parent
6fdbf6b
commit ecc01b9
Showing
73 changed files
with
1,401 additions
and
473 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package client; | ||
|
||
import com.typesafe.config.Config; | ||
import org.apache.kafka.clients.CommonClientConfigs; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.config.SaslConfigs; | ||
import org.apache.kafka.common.config.SslConfigs; | ||
import org.apache.kafka.common.security.auth.SecurityProtocol; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import play.api.inject.ApplicationLifecycle; | ||
import utils.ConfigUtil; | ||
|
||
import javax.inject.Inject; | ||
|
||
import javax.annotation.Nonnull; | ||
import javax.inject.Singleton; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Properties; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
@Singleton | ||
public class KafkaTrackingProducer { | ||
private final Logger _logger = LoggerFactory.getLogger(KafkaTrackingProducer.class.getName()); | ||
private static final List<String> KAFKA_SSL_PROTOCOLS = Collections.unmodifiableList( | ||
Arrays.asList(SecurityProtocol.SSL.name(), SecurityProtocol.SASL_SSL.name(), | ||
SecurityProtocol.SASL_PLAINTEXT.name())); | ||
|
||
private final Boolean _isEnabled; | ||
private final KafkaProducer<String, String> _producer; | ||
|
||
@Inject | ||
public KafkaTrackingProducer(@Nonnull Config config, ApplicationLifecycle lifecycle) { | ||
_isEnabled = !config.hasPath("analytics.enabled") || config.getBoolean("analytics.enabled"); | ||
|
||
if (_isEnabled) { | ||
_logger.debug("Analytics tracking is enabled"); | ||
_producer = createKafkaProducer(config); | ||
|
||
lifecycle.addStopHook( | ||
() -> { | ||
_producer.flush(); | ||
_producer.close(); | ||
return CompletableFuture.completedFuture(null); | ||
}); | ||
} else { | ||
_logger.debug("Analytics tracking is disabled"); | ||
_producer = null; | ||
} | ||
} | ||
|
||
public Boolean isEnabled() { | ||
return _isEnabled; | ||
} | ||
|
||
public void send(ProducerRecord<String, String> record) { | ||
_producer.send(record); | ||
} | ||
|
||
private static KafkaProducer createKafkaProducer(Config config) { | ||
final Properties props = new Properties(); | ||
props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend"); | ||
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.getString("analytics.kafka.delivery.timeout.ms")); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("analytics.kafka.bootstrap.server")); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Actor urn. | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // JSON object. | ||
|
||
final String securityProtocolConfig = "analytics.kafka.security.protocol"; | ||
if (config.hasPath(securityProtocolConfig) | ||
&& KAFKA_SSL_PROTOCOLS.contains(config.getString(securityProtocolConfig))) { | ||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getString(securityProtocolConfig)); | ||
setConfig(config, props, SslConfigs.SSL_KEY_PASSWORD_CONFIG, "analytics.kafka.ssl.key.password"); | ||
|
||
setConfig(config, props, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "analytics.kafka.ssl.keystore.type"); | ||
setConfig(config, props, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.keystore.location"); | ||
setConfig(config, props, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.keystore.password"); | ||
|
||
setConfig(config, props, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "analytics.kafka.ssl.truststore.type"); | ||
setConfig(config, props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "analytics.kafka.ssl.truststore.location"); | ||
setConfig(config, props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "analytics.kafka.ssl.truststore.password"); | ||
|
||
setConfig(config, props, SslConfigs.SSL_PROTOCOL_CONFIG, "analytics.kafka.ssl.protocol"); | ||
setConfig(config, props, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "analytics.kafka.ssl.endpoint.identification.algorithm"); | ||
|
||
final String securityProtocol = config.getString(securityProtocolConfig); | ||
if (securityProtocol.equals(SecurityProtocol.SASL_SSL.name()) | ||
|| securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name())) { | ||
setConfig(config, props, SaslConfigs.SASL_MECHANISM, "analytics.kafka.sasl.mechanism"); | ||
setConfig(config, props, SaslConfigs.SASL_JAAS_CONFIG, "analytics.kafka.sasl.jaas.config"); | ||
setConfig(config, props, SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "analytics.kafka.sasl.kerberos.service.name"); | ||
setConfig(config, props, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "analytics.kafka.sasl.login.callback.handler.class"); | ||
} | ||
} | ||
|
||
return new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props); | ||
} | ||
|
||
private static void setConfig(Config config, Properties props, String key, String configKey) { | ||
Optional.ofNullable(ConfigUtil.getString(config, configKey, null)) | ||
.ifPresent(v -> props.put(key, v)); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.