Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling end to end acknowledgement in Jira Source #5344

Merged
merged 9 commits into from
Jan 23, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventType;
Expand Down Expand Up @@ -87,7 +88,7 @@ void injectObjectMapper(ObjectMapper objectMapper) {
@Override
public void executePartition(SaasWorkerProgressState state,
Buffer<Record<Event>> buffer,
CrawlerSourceConfig configuration) {
AcknowledgementSet acknowledgementSet) {
log.trace("Executing the partition: {} with {} ticket(s)",
state.getKeyAttributes(), state.getItemIds().size());
List<String> itemIds = state.getItemIds();
Expand Down Expand Up @@ -130,7 +131,13 @@ public void executePartition(SaasWorkerProgressState state,
.collect(Collectors.toList());

try {
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
if (configuration.isAcknowledgments()) {
recordsToWrite.forEach(eventRecord -> acknowledgementSet.add(eventRecord.getData()));
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
acknowledgementSet.complete();
} else {
buffer.writeAll(recordsToWrite, (int) Duration.ofSeconds(bufferWriteTimeoutInSeconds).toMillis());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public class JiraSourceConfig implements CrawlerSourceConfig {
@JsonProperty("backoff_time")
private Duration backOff = DEFAULT_BACKOFF_MILLIS;

/**
* Boolean property indicating end to end acknowledgments state
*/
@JsonProperty("acknowledgments")
@Getter
san81 marked this conversation as resolved.
Show resolved Hide resolved
private boolean acknowledgments = false;

public String getAccountUrl() {
return this.getHosts().get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import lombok.Getter;
import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig;
import org.opensearch.dataprepper.plugins.source.jira.configuration.Oauth2Config;
import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -126,6 +127,7 @@ public void renewCredentials() {
String payload = String.format(payloadTemplate, "refresh_token", clientId, clientSecret, refreshToken);
HttpEntity<String> entity = new HttpEntity<>(payload, headers);

Oauth2Config oauth2Config = jiraSourceConfig.getAuthenticationConfig().getOauth2Config();
try {
ResponseEntity<Map> responseEntity = restTemplate.postForEntity(TOKEN_LOCATION, entity, Map.class);
Map<String, Object> oauthClientResponse = responseEntity.getBody();
Expand All @@ -134,10 +136,8 @@ public void renewCredentials() {
this.expiresInSeconds = (int) oauthClientResponse.get(EXPIRES_IN);
this.expireTime = Instant.ofEpochMilli(System.currentTimeMillis() + (expiresInSeconds * 1000L));
// updating config object's PluginConfigVariable so that it updates the underlying Secret store
jiraSourceConfig.getAuthenticationConfig().getOauth2Config().getAccessToken()
.setValue(this.accessToken);
jiraSourceConfig.getAuthenticationConfig().getOauth2Config().getRefreshToken()
.setValue(this.refreshToken);
oauth2Config.getAccessToken().setValue(this.accessToken);
oauth2Config.getRefreshToken().setValue(this.refreshToken);
log.info("Access Token and Refresh Token pair is now refreshed. Corresponding Secret store key updated.");
} catch (HttpClientErrorException ex) {
this.expireTime = Instant.ofEpochMilli(0);
Expand All @@ -147,9 +147,12 @@ public void renewCredentials() {
statusCode, ex.getMessage());
if (statusCode == HttpStatus.FORBIDDEN || statusCode == HttpStatus.UNAUTHORIZED) {
log.info("Trying to refresh the secrets");
// Try refreshing the secrets and see if that helps
// Refreshing one of the secret refreshes the entire store so we are good to trigger refresh on just one
jiraSourceConfig.getAuthenticationConfig().getOauth2Config().getAccessToken().refresh();
// Refreshing the secrets. It should help if someone already renewed the tokens.
// Refreshing one of the secret refreshes the entire store so triggering refresh on just one
oauth2Config.getAccessToken().refresh();
this.accessToken = (String) oauth2Config.getAccessToken().getValue();
this.refreshToken = (String) oauth2Config.getRefreshToken().getValue();
this.expireTime = Instant.ofEpochMilli(System.currentTimeMillis() + (expiresInSeconds * 100L));
san81 marked this conversation as resolved.
Show resolved Hide resolved
}
throw new RuntimeException("Failed to renew access token message:" + ex.getMessage(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;

Expand All @@ -46,26 +46,21 @@
@ExtendWith(MockitoExtension.class)
public class JiraClientTest {

private final PluginExecutorServiceProvider executorServiceProvider = new PluginExecutorServiceProvider();
@Mock
private Buffer<Record<Event>> buffer;

@Mock
private SaasWorkerProgressState saasWorkerProgressState;

@Mock
private CrawlerSourceConfig crawlerSourceConfig;

private AcknowledgementSet acknowledgementSet;
@Mock
private JiraSourceConfig jiraSourceConfig;

@Mock
private JiraService jiraService;

@Mock
private JiraIterator jiraIterator;

private PluginExecutorServiceProvider executorServiceProvider = new PluginExecutorServiceProvider();

@Test
void testConstructor() {
JiraClient jiraClient = new JiraClient(jiraService, jiraIterator, executorServiceProvider, jiraSourceConfig);
Expand Down Expand Up @@ -98,7 +93,7 @@ void testExecutePartition() throws Exception {

ArgumentCaptor<Collection<Record<Event>>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class);

jiraClient.executePartition(saasWorkerProgressState, buffer, crawlerSourceConfig);
jiraClient.executePartition(saasWorkerProgressState, buffer, acknowledgementSet);

verify(buffer).writeAll(recordsCaptor.capture(), anyInt());
Collection<Record<Event>> capturedRecords = recordsCaptor.getValue();
Expand All @@ -121,14 +116,13 @@ void testExecutePartitionError() throws Exception {

when(jiraService.getIssue(anyString())).thenReturn("{\"id\":\"ID1\",\"key\":\"TEST-1\"}");

ArgumentCaptor<Collection<Record<Event>>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class);

ObjectMapper mockObjectMapper = mock(ObjectMapper.class);
when(mockObjectMapper.readValue(any(String.class), any(TypeReference.class))).thenThrow(new JsonProcessingException("test") {
});
jiraClient.injectObjectMapper(mockObjectMapper);

assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, crawlerSourceConfig));
assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, acknowledgementSet));
}

@Test
Expand All @@ -147,6 +141,6 @@ void bufferWriteRuntimeTest() throws Exception {
ArgumentCaptor<Collection<Record<Event>>> recordsCaptor = ArgumentCaptor.forClass((Class) Collection.class);

doThrow(new RuntimeException()).when(buffer).writeAll(recordsCaptor.capture(), anyInt());
assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, crawlerSourceConfig));
assertThrows(RuntimeException.class, () -> jiraClient.executePartition(saasWorkerProgressState, buffer, acknowledgementSet));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -23,7 +24,7 @@
@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
private static final int maxItemsPerPage = 50;
private static final int maxItemsPerPage = 100;
san81 marked this conversation as resolved.
Show resolved Hide resolved
private final Timer crawlingTimer;
private final PluginMetrics pluginMetrics =
PluginMetrics.fromNames("sourceCrawler", "crawler");
Expand Down Expand Up @@ -61,8 +62,8 @@ public Instant crawl(Instant lastPollTime,
return Instant.ofEpochMilli(startTime);
}

public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, CrawlerSourceConfig sourceConfig) {
client.executePartition(state, buffer, sourceConfig);
public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
}

private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.base;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -36,9 +37,9 @@ public interface CrawlerClient {
/**
* Method for executing a particular partition or a chunk of work
*
* @param state worker node state holds the details of this particular chunk of work
* @param buffer pipeline buffer to write the results into
* @param sourceConfig pipeline configuration from the yaml
* @param state worker node state holds the details of this particular chunk of work
* @param buffer pipeline buffer to write the results into
* @param acknowledgementSet acknowledgement set to be used to track the completion of the partition
*/
void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, CrawlerSourceConfig sourceConfig);
void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,11 @@
public interface CrawlerSourceConfig {

int DEFAULT_NUMBER_OF_WORKERS = 1;

/**
* Boolean to indicate if acknowledgments enabled for this source
*
* @return boolean indicating acknowledgement state
*/
boolean isAcknowledgments();
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void start(Buffer<Record<Event>> buffer) {
this.executorService.submit(leaderScheduler);
//Register worker threaders
for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) {
WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler);
WorkerScheduler workerScheduler = new WorkerScheduler(sourcePluginName, buffer, coordinator,
sourceConfig, crawler, pluginMetrics, acknowledgementSetManager);
this.executorService.submit(new Thread(workerScheduler));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -21,24 +25,41 @@
*/
public class WorkerScheduler implements Runnable {

public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
private static final Duration ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofSeconds(20);
private static final Logger log = LoggerFactory.getLogger(WorkerScheduler.class);
private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000;
private static final Duration DEFAULT_SLEEP_DURATION_MILLIS = Duration.ofMillis(10000);

private final EnhancedSourceCoordinator sourceCoordinator;
private final CrawlerSourceConfig sourceConfig;
private final Crawler crawler;
private final Buffer<Record<Event>> buffer;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final Counter acknowledgementSetSuccesses;
private final Counter acknowledgementSetFailures;
private final String sourcePluginName;
private final String SOURCE_PLUGIN_NAME = "sourcePluginName";


public WorkerScheduler(Buffer<Record<Event>> buffer,
public WorkerScheduler(final String sourcePluginName,
Buffer<Record<Event>> buffer,
EnhancedSourceCoordinator sourceCoordinator,
CrawlerSourceConfig sourceConfig,
Crawler crawler) {
Crawler crawler,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.sourceCoordinator = sourceCoordinator;
this.sourceConfig = sourceConfig;
this.crawler = crawler;
this.buffer = buffer;
this.sourcePluginName = sourcePluginName;

this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetSuccesses = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, SOURCE_PLUGIN_NAME, sourcePluginName);
this.acknowledgementSetFailures = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, SOURCE_PLUGIN_NAME, sourcePluginName);
}

@Override
Expand All @@ -52,7 +73,7 @@ public void run() {
sourceCoordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE);
if (partition.isPresent()) {
// Process the partition (source extraction logic)
processPartition(partition.get(), buffer, sourceConfig);
processPartition(partition.get(), buffer);

} else {
log.debug("No partition available. This thread will sleep for {}", DEFAULT_SLEEP_DURATION_MILLIS);
Expand All @@ -75,13 +96,31 @@ public void run() {
log.warn("SourceItemWorker Scheduler is interrupted, looks like shutdown has triggered");
}

private void processPartition(EnhancedSourcePartition partition, Buffer<Record<Event>> buffer, CrawlerSourceConfig sourceConfig) {
private void processPartition(EnhancedSourcePartition partition, Buffer<Record<Event>> buffer) {
// Implement your source extraction logic here
// Update the partition state or commit the partition as needed
// Commit the partition to mark it as processed
if (partition.getProgressState().isPresent()) {
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, sourceConfig);
AcknowledgementSet acknowledgementSet = null;
if (sourceConfig.isAcknowledgments()) {
acknowledgementSet = createAcknowledgementSet(partition);
}
crawler.executePartition((SaasWorkerProgressState) partition.getProgressState().get(), buffer, acknowledgementSet);
}
sourceCoordinator.completePartition(partition);
}

private AcknowledgementSet createAcknowledgementSet(EnhancedSourcePartition partition) {
return acknowledgementSetManager.create((result) -> {
if (result) {
acknowledgementSetSuccesses.increment();
sourceCoordinator.completePartition(partition);
log.debug("acknowledgements received for partitionKey: {}", partition.getPartitionKey());
} else {
acknowledgementSetFailures.increment();
log.debug("acknowledgements received with false for partitionKey: {}", partition.getPartitionKey());
sourceCoordinator.giveUpPartition(partition);
}
}, ACKNOWLEDGEMENT_SET_TIMEOUT);
san81 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -32,7 +33,7 @@
@ExtendWith(MockitoExtension.class)
public class CrawlerTest {
@Mock
private CrawlerSourceConfig sourceConfig;
private AcknowledgementSet acknowledgementSet;

@Mock
private EnhancedSourceCoordinator coordinator;
Expand Down Expand Up @@ -60,8 +61,8 @@ public void crawlerConstructionTest() {

@Test
public void executePartitionTest() {
crawler.executePartition(state, buffer, sourceConfig);
verify(client).executePartition(state, buffer, sourceConfig);
crawler.executePartition(state, buffer, acknowledgementSet);
verify(client).executePartition(state, buffer, acknowledgementSet);
}

@Test
Expand Down
Loading
Loading