diff --git a/.github/workflows/beam_PerformanceTests_Kafka_IO.yml b/.github/workflows/beam_PerformanceTests_Kafka_IO.yml index 39e49db09196..1f0b71cd03d3 100644 --- a/.github/workflows/beam_PerformanceTests_Kafka_IO.yml +++ b/.github/workflows/beam_PerformanceTests_Kafka_IO.yml @@ -54,7 +54,7 @@ jobs: github.event_name == 'workflow_dispatch' || (github.event_name == 'schedule' && github.repository == 'apache/beam') || github.event.comment.body == 'Run Java KafkaIO Performance Test' - runs-on: [self-hosted, ubuntu-20.04, main] + runs-on: [self-hosted, ubuntu-20.04, highmem] timeout-minutes: 120 name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) strategy: diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index cef3bc80d613..adf31dc72b54 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -94,6 +94,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -142,6 +143,16 @@ public class KafkaIOIT { private static final Logger LOG = LoggerFactory.getLogger(KafkaIOIT.class); + private static final int RETRIES_CONFIG = 10; + + private static final int REQUEST_TIMEOUT_MS_CONFIG = 600000; + + private static final int MAX_BLOCK_MS_CONFIG = 300000; + + private static final int BUFFER_MEMORY_CONFIG = 100554432; + + private static final int RETRY_BACKOFF_MS_CONFIG = 5000; + private static SyntheticSourceOptions sourceOptions; private static Options options; @@ -938,7 +949,14 @@ private KafkaIO.Write writeToKafka() { return KafkaIO.write() .withBootstrapServers(options.getKafkaBootstrapServerAddresses()) .withKeySerializer(ByteArraySerializer.class) - .withValueSerializer(ByteArraySerializer.class); + .withValueSerializer(ByteArraySerializer.class) + .withProducerConfigUpdates( + ImmutableMap.of( + ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG, + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS_CONFIG, + ProducerConfig.MAX_BLOCK_MS_CONFIG, MAX_BLOCK_MS_CONFIG, + ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY_CONFIG, + ProducerConfig.RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS_CONFIG)); } private KafkaIO.Read readFromBoundedKafka() {