Skip to content

Commit

Permalink
Merge pull request #15530 from cdapio/CDAP-20943
Browse files Browse the repository at this point in the history
[CDAP-20943] Use a separate consumer id for MetadataConsumerSubscriberService
  • Loading branch information
itsankit-google authored Jan 23, 2024
2 parents 2bd5171 + e71b305 commit 485a516
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import io.cdap.cdap.data2.metadata.writer.MetadataOperation;
import io.cdap.cdap.data2.metadata.writer.MetadataOperationTypeAdapter;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.context.MultiThreadMessagingContext;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService;
import io.cdap.cdap.metadata.MetadataMessageProcessor;
import io.cdap.cdap.proto.ProgramRunStatus;
Expand Down Expand Up @@ -112,7 +112,7 @@ public class MetadataConsumerSubscriberService extends AbstractMessagingSubscrib
Constants.Metrics.Tag.INSTANCE_ID, "0",
Constants.Metrics.Tag.NAMESPACE, NamespaceId.SYSTEM.getNamespace(),
Constants.Metrics.Tag.TOPIC, cConf.get(Constants.Metadata.MESSAGING_TOPIC),
Constants.Metrics.Tag.CONSUMER, Constants.Metadata.METADATA_WRITER_SUBSCRIBER
Constants.Metrics.Tag.CONSUMER, Constants.Metadata.METADATA_CONSUMER_WRITER_SUBSCRIBER
)));
this.messagingContext = new MultiThreadMessagingContext(messagingService);
this.transactionRunner = transactionRunner;
Expand Down Expand Up @@ -141,16 +141,22 @@ protected MetadataMessage decodeMessage(Message message) {
@Override
protected String loadMessageId(StructuredTableContext context) throws IOException, TableNotFoundException {
AppMetadataStore appMetadataStore = AppMetadataStore.create(context);
return appMetadataStore.retrieveSubscriberState(getTopicId().getTopic(),
Constants.Metadata.METADATA_WRITER_SUBSCRIBER);
String messageId = appMetadataStore.retrieveSubscriberState(getTopicId().getTopic(),
Constants.Metadata.METADATA_CONSUMER_WRITER_SUBSCRIBER);
if (messageId == null) {
// for backward compatibility in case of upgrade
messageId = appMetadataStore.retrieveSubscriberState(getTopicId().getTopic(),
Constants.Metadata.METADATA_WRITER_SUBSCRIBER);
}
return messageId;
}

@Override
protected void storeMessageId(StructuredTableContext context, String messageId)
throws IOException, TableNotFoundException {
AppMetadataStore appMetadataStore = AppMetadataStore.create(context);
appMetadataStore.persistSubscriberState(getTopicId().getTopic(),
Constants.Metadata.METADATA_WRITER_SUBSCRIBER, messageId);
Constants.Metadata.METADATA_CONSUMER_WRITER_SUBSCRIBER, messageId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import io.cdap.cdap.data2.registry.UsageTable;
import io.cdap.cdap.internal.app.runtime.workflow.BasicWorkflowToken;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.context.MultiThreadMessagingContext;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.subscriber.AbstractMessagingSubscriberService;
import io.cdap.cdap.metadata.profile.ProfileMetadataMessageProcessor;
import io.cdap.cdap.proto.NamespaceMeta;
Expand Down Expand Up @@ -138,7 +138,7 @@ public class MetadataSubscriberService extends AbstractMessagingSubscriberServic
Constants.Metrics.Tag.INSTANCE_ID, "0",
Constants.Metrics.Tag.NAMESPACE, NamespaceId.SYSTEM.getNamespace(),
Constants.Metrics.Tag.TOPIC, cConf.get(Constants.Metadata.MESSAGING_TOPIC),
Constants.Metrics.Tag.CONSUMER, "metadata.writer"
Constants.Metrics.Tag.CONSUMER, Constants.Metadata.METADATA_WRITER_SUBSCRIBER
)));

this.cConf = cConf;
Expand Down Expand Up @@ -169,14 +169,16 @@ protected MetadataMessage decodeMessage(Message message) {
protected String loadMessageId(StructuredTableContext context)
throws IOException, TableNotFoundException {
AppMetadataStore appMetadataStore = AppMetadataStore.create(context);
return appMetadataStore.retrieveSubscriberState(getTopicId().getTopic(), "metadata.writer");
return appMetadataStore.retrieveSubscriberState(getTopicId().getTopic(),
Constants.Metadata.METADATA_WRITER_SUBSCRIBER);
}

@Override
protected void storeMessageId(StructuredTableContext context, String messageId)
throws IOException, TableNotFoundException {
AppMetadataStore appMetadataStore = AppMetadataStore.create(context);
appMetadataStore.persistSubscriberState(getTopicId().getTopic(), "metadata.writer", messageId);
appMetadataStore.persistSubscriberState(getTopicId().getTopic(),
Constants.Metadata.METADATA_WRITER_SUBSCRIBER, messageId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2070,6 +2070,7 @@ public static final class Metadata {
public static final String STORAGE_PROVIDER_ELASTICSEARCH = "elastic";

public static final String METADATA_WRITER_SUBSCRIBER = "metadata.writer";
public static final String METADATA_CONSUMER_WRITER_SUBSCRIBER = "metadata.consumer.writer";
}

/**
Expand Down

0 comments on commit 485a516

Please sign in to comment.