Skip to content

Commit

Permalink
[FLINK-13231] [pubsub] Replace Max outstanding acknowledgement ids li…
Browse files Browse the repository at this point in the history
…mit with a FlinkConnectorRateLimiter
  • Loading branch information
Xeli authored and becketqin committed Aug 19, 2019
1 parent 2f324f8 commit f539300
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.gcp.pubsub;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.io.ratelimiting.GuavaFlinkConnectorRateLimiter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
Expand All @@ -40,8 +42,6 @@
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -57,13 +57,12 @@
*/
public class PubSubSource<OUT> extends RichSourceFunction<OUT>
implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>, CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
public static final int NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT = -1;
private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class);
protected final PubSubDeserializationSchema<OUT> deserializationSchema;
protected final PubSubSubscriberFactory pubSubSubscriberFactory;
protected final Credentials credentials;
protected final int maxMessagesToAcknowledge;
protected final AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory;
protected final FlinkConnectorRateLimiter rateLimiter;
protected final int messagePerSecondRateLimit;

protected transient AcknowledgeOnCheckpoint<String> acknowledgeOnCheckpoint;
protected transient PubSubSubscriber subscriber;
Expand All @@ -73,13 +72,15 @@ public class PubSubSource<OUT> extends RichSourceFunction<OUT>
PubSubSource(PubSubDeserializationSchema<OUT> deserializationSchema,
PubSubSubscriberFactory pubSubSubscriberFactory,
Credentials credentials,
int maxMessagesToAcknowledge,
AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory) {
AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory,
FlinkConnectorRateLimiter rateLimiter,
int messagePerSecondRateLimit) {
this.deserializationSchema = deserializationSchema;
this.pubSubSubscriberFactory = pubSubSubscriberFactory;
this.credentials = credentials;
this.maxMessagesToAcknowledge = maxMessagesToAcknowledge;
this.acknowledgeOnCheckpointFactory = acknowledgeOnCheckpointFactory;
this.rateLimiter = rateLimiter;
this.messagePerSecondRateLimit = messagePerSecondRateLimit;
}

@Override
Expand All @@ -92,6 +93,10 @@ public void open(Configuration configuration) throws Exception {

getRuntimeContext().getMetricGroup().gauge("PubSubMessagesProcessedNotAcked", this::getOutstandingMessagesToAck);

//convert per-subtask-limit to global rate limit, as FlinkConnectorRateLimiter::setRate expects a global rate limit.
rateLimiter.setRate(messagePerSecondRateLimit * getRuntimeContext().getNumberOfParallelSubtasks());
rateLimiter.open(getRuntimeContext());

createAndSetPubSubSubscriber();
this.isRunning = true;
}
Expand All @@ -104,11 +109,6 @@ private boolean hasNoCheckpointingEnabled(RuntimeContext runtimeContext) {
public void run(SourceContext<OUT> sourceContext) throws Exception {
while (isRunning) {
try {
if (maxMessagesToAcknowledgeLimitReached()) {
LOG.debug("Sleeping because there are {} messages waiting to be ack'ed but limit is {}", getOutstandingMessagesToAck(), maxMessagesToAcknowledge);
Thread.sleep(100);
continue;
}

processMessage(sourceContext, subscriber.pull());
} catch (InterruptedException | CancellationException e) {
Expand All @@ -119,6 +119,8 @@ public void run(SourceContext<OUT> sourceContext) throws Exception {
}

void processMessage(SourceContext<OUT> sourceContext, List<ReceivedMessage> messages) throws Exception {
rateLimiter.acquire(messages.size());

synchronized (sourceContext.getCheckpointLock()) {
for (ReceivedMessage message : messages) {
acknowledgeOnCheckpoint.addAcknowledgeId(message.getAckId());
Expand All @@ -137,10 +139,6 @@ void processMessage(SourceContext<OUT> sourceContext, List<ReceivedMessage> mess
}
}

private boolean maxMessagesToAcknowledgeLimitReached() throws Exception {
return maxMessagesToAcknowledge != NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT && getOutstandingMessagesToAck() > maxMessagesToAcknowledge;
}

private Integer getOutstandingMessagesToAck() {
return acknowledgeOnCheckpoint.numberOfOutstandingAcknowledgements();
}
Expand Down Expand Up @@ -197,6 +195,7 @@ public static class PubSubSourceBuilder<OUT> implements ProjectNameBuilder<OUT>,
private PubSubSubscriberFactory pubSubSubscriberFactory;
private Credentials credentials;
private int maxMessageToAcknowledge = 10000;
private int messagePerSecondRateLimit = 100000;

private PubSubSourceBuilder(DeserializationSchema<OUT> deserializationSchema) {
Preconditions.checkNotNull(deserializationSchema);
Expand Down Expand Up @@ -264,12 +263,13 @@ public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPu
}

/**
* Set a limit of the number of outstanding or to-be acknowledged messages.
* default is 10000. Adjust this if you have high checkpoint intervals and / or run into memory issues
* due to the amount of acknowledgement ids. Use {@link PubSubSource}.NO_MAX_MESSAGES_TO_ACKNOWLEDGE_LIMIT if you want to remove the limit.
* Set a limit on the rate of messages per second received. This limit is per parallel instance of the source function.
* Default is set to 100000 messages per second.
*
* @param messagePerSecondRateLimit the message per second rate limit.
*/
public PubSubSourceBuilder<OUT> withMaxMessageToAcknowledge(int maxMessageToAcknowledge) {
this.maxMessageToAcknowledge = maxMessageToAcknowledge;
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit) {
this.messagePerSecondRateLimit = messagePerSecondRateLimit;
return this;
}

Expand All @@ -292,7 +292,7 @@ public PubSubSource<OUT> build() throws IOException {
100);
}

return new PubSubSource<>(deserializationSchema, pubSubSubscriberFactory, credentials, maxMessageToAcknowledge, new AcknowledgeOnCheckpointFactory());
return new PubSubSource<>(deserializationSchema, pubSubSubscriberFactory, credentials, new AcknowledgeOnCheckpointFactory(), new GuavaFlinkConnectorRateLimiter(), messagePerSecondRateLimit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.gcp.pubsub;

import org.apache.flink.api.common.io.ratelimiting.FlinkConnectorRateLimiter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -78,6 +79,8 @@ public class PubSubSourceTest {
private Credentials credentials;
@Mock
private PubSubSubscriber pubsubSubscriber;
@Mock
private FlinkConnectorRateLimiter rateLimiter;

private PubSubSource<String> pubSubSource;

Expand All @@ -91,8 +94,9 @@ public void setup() throws Exception {
pubSubSource = new PubSubSource<>(deserializationSchema,
pubSubSubscriberFactory,
credentials,
100,
acknowledgeOnCheckpointFactory);
acknowledgeOnCheckpointFactory,
rateLimiter,
1024);
pubSubSource.setRuntimeContext(streamingRuntimeContext);
}

Expand Down Expand Up @@ -120,10 +124,15 @@ public void testProcessMessage() throws Exception {
when(sourceContext.getCheckpointLock()).thenReturn("some object to lock on");

pubSubSource.open(null);
pubSubSource.processMessage(sourceContext, asList(receivedMessage("firstAckId", pubSubMessage(FIRST_MESSAGE)),
receivedMessage("secondAckId", pubSubMessage(SECOND_MESSAGE))));
List<ReceivedMessage> receivedMessages = asList(
receivedMessage("firstAckId", pubSubMessage(FIRST_MESSAGE)),
receivedMessage("secondAckId", pubSubMessage(SECOND_MESSAGE))
);
pubSubSource.processMessage(sourceContext, receivedMessages);

//verify handling messages
verify(rateLimiter, times(1)).acquire(2);

verify(sourceContext, times(1)).getCheckpointLock();
verify(deserializationSchema, times(1)).isEndOfStream(FIRST_MESSAGE);
verify(deserializationSchema, times(1)).deserialize(pubSubMessage(FIRST_MESSAGE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private static void runFlinkJob(String projectName, String subscriptionName, Str
.withDeserializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.withMessageRateLimit(1)
.build())
.map(PubSubExample::printAndReturn).disableChaining()
.addSink(PubSubSink.newBuilder()
Expand Down

0 comments on commit f539300

Please sign in to comment.