Skip to content

Commit

Permalink
[HUDI-7086] Fix the default for gcp pub sub max sync time to 1min (ap…
Browse files Browse the repository at this point in the history
…ache#10171)

Co-authored-by: rmahindra123 <[email protected]>
  • Loading branch information
2 people authored and nsivabalan committed Nov 29, 2023
1 parent 2a0f18b commit dbeda41
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ public class CloudSourceConfig extends HoodieConfig {
.sinceVersion("0.14.1")
.withDocumentation("specify this value in bytes, to coalesce partitions of source dataset not greater than specified limit");

public static final ConfigProperty<Integer> MAX_FETCH_TIME_PER_SYNC_MS = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.fetch.time.per.sync.ms")
.defaultValue(1)
public static final ConfigProperty<Integer> MAX_FETCH_TIME_PER_SYNC_SECS = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.meta.max.fetch.time.per.sync.secs")
.defaultValue(60)
.markAdvanced()
.sinceVersion("0.14.1")
.withDocumentation("Max time in millis to consume " + MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event queues like SQS, "
.withDocumentation("Max time in secs to consume " + MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event queues like SQS, "
+ "PubSub can return empty responses even when messages are available the queue, this config ensures we don't wait forever "
+ "to consume MAX_MESSAGES_CONF messages, but time out and move on further.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.CloudSourceConfig.ACK_MESSAGES;
import static org.apache.hudi.utilities.config.CloudSourceConfig.BATCH_SIZE_CONF;
import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_MS;
import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_FETCH_TIME_PER_SYNC_SECS;
import static org.apache.hudi.utilities.config.CloudSourceConfig.MAX_NUM_MESSAGES_PER_SYNC;
import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.GOOGLE_PROJECT_ID;
import static org.apache.hudi.utilities.config.GCSEventsSourceConfig.PUBSUB_SUBSCRIPTION_ID;
Expand Down Expand Up @@ -121,7 +121,7 @@ public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession
getStringWithAltKeys(props, PUBSUB_SUBSCRIPTION_ID),
getIntWithAltKeys(props, BATCH_SIZE_CONF),
getIntWithAltKeys(props, MAX_NUM_MESSAGES_PER_SYNC),
getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_MS))
getIntWithAltKeys(props, MAX_FETCH_TIME_PER_SYNC_SECS))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,21 @@ public class PubsubMessagesFetcher {

private final int batchSize;
private final int maxMessagesPerSync;
private final long maxFetchTimePerSync;
private final long maxFetchTimePerSyncSecs;
private final SubscriberStubSettings subscriberStubSettings;
private final PubsubQueueClient pubsubQueueClient;

private static final Logger LOG = LoggerFactory.getLogger(PubsubMessagesFetcher.class);

public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize,
int maxMessagesPerSync,
long maxFetchTimePerSync,
long maxFetchTimePerSyncSecs,
PubsubQueueClient pubsubQueueClient) {
this.googleProjectId = googleProjectId;
this.pubsubSubscriptionId = pubsubSubscriptionId;
this.batchSize = batchSize;
this.maxMessagesPerSync = maxMessagesPerSync;
this.maxFetchTimePerSync = maxFetchTimePerSync;
this.maxFetchTimePerSyncSecs = maxFetchTimePerSyncSecs;

try {
/** For details of timeout and retry configs,
Expand All @@ -94,13 +94,13 @@ public PubsubMessagesFetcher(
String pubsubSubscriptionId,
int batchSize,
int maxMessagesPerSync,
long maxFetchTimePerSync) {
long maxFetchTimePerSyncSecs) {
this(
googleProjectId,
pubsubSubscriptionId,
batchSize,
maxMessagesPerSync,
maxFetchTimePerSync,
maxFetchTimePerSyncSecs,
new PubsubQueueClient()
);
}
Expand All @@ -112,7 +112,8 @@ public List<ReceivedMessage> fetchMessages() {
long startTime = System.currentTimeMillis();
long unAckedMessages = pubsubQueueClient.getNumUnAckedMessages(this.pubsubSubscriptionId);
LOG.info("Found unacked messages " + unAckedMessages);
while (messageList.size() < unAckedMessages && messageList.size() < maxMessagesPerSync && (System.currentTimeMillis() - startTime < maxFetchTimePerSync)) {
while (messageList.size() < unAckedMessages && messageList.size() < maxMessagesPerSync
&& ((System.currentTimeMillis() - startTime) < (maxFetchTimePerSyncSecs * 1000))) {
PullResponse pullResponse = pubsubQueueClient.makePullRequest(subscriber, subscriptionName, batchSize);
messageList.addAll(pullResponse.getReceivedMessagesList());
}
Expand Down

0 comments on commit dbeda41

Please sign in to comment.