From 787a981cc2c009075d8c2aa5f3e2c66d5988aeeb Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 13 Nov 2024 11:32:13 +0800 Subject: [PATCH] [flink] Improve Exception message for consumer without expire time --- .../org/apache/paimon/flink/source/FlinkSourceBuilder.java | 4 +++- .../java/org/apache/paimon/flink/CatalogTableITCase.java | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index ed94043c035d..a648bfba607d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -259,7 +259,9 @@ public DataStream build() { if (conf.contains(CoreOptions.CONSUMER_ID) && !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) { throw new IllegalArgumentException( - "consumer.expiration-time should be specified when using consumer-id."); + "You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it" + + " to take effect, when you need consumer-id feature. This is to prevent consumers from leaving" + + " too many snapshots that could pose a risk to the file system."); } if (sourceBounded) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index ba063248ee46..8a3e068a72a0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -972,7 +972,8 @@ public void testConsumerIdExpInBatchMode() { "SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1")) .rootCause() .isInstanceOf(IllegalArgumentException.class) - .hasMessage("consumer.expiration-time should be specified when using consumer-id."); + .hasMessageContaining( + "You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it"); } @Test @@ -985,7 +986,8 @@ public void testConsumerIdExpInStreamingMode() { streamSqlIter( "SELECT * FROM T /*+ OPTIONS('consumer-id'='test-id') */")) .isInstanceOf(IllegalArgumentException.class) - .hasMessage("consumer.expiration-time should be specified when using consumer-id."); + .hasMessageContaining( + "You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it"); } @Test