Skip to content

Commit

Permalink
[flink] Improve Exception message for consumer without expire time
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 13, 2024
1 parent 72c25d5 commit 787a981
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ public DataStream<RowData> build() {
if (conf.contains(CoreOptions.CONSUMER_ID)
&& !conf.contains(CoreOptions.CONSUMER_EXPIRATION_TIME)) {
throw new IllegalArgumentException(
"consumer.expiration-time should be specified when using consumer-id.");
"You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it"
+ " to take effect, when you need consumer-id feature. This is to prevent consumers from leaving"
+ " too many snapshots that could pose a risk to the file system.");
}

if (sourceBounded) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,8 @@ public void testConsumerIdExpInBatchMode() {
"SELECT * FROM T /*+ OPTIONS('consumer-id' = 'test-id') */ WHERE a = 1"))
.rootCause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("consumer.expiration-time should be specified when using consumer-id.");
.hasMessageContaining(
"You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it");
}

@Test
Expand All @@ -985,7 +986,8 @@ public void testConsumerIdExpInStreamingMode() {
streamSqlIter(
"SELECT * FROM T /*+ OPTIONS('consumer-id'='test-id') */"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("consumer.expiration-time should be specified when using consumer-id.");
.hasMessageContaining(
"You need to configure 'consumer.expiration-time' (ALTER TABLE) and restart your write job for it");
}

@Test
Expand Down

0 comments on commit 787a981

Please sign in to comment.