-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stateful buffer for lambda sink #5354
Open
srikanthjg
wants to merge
3
commits into
opensearch-project:main
Choose a base branch
from
srikanthjg:lambda-sink-stateful
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+383
−209
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,10 +22,13 @@ | |
import org.opensearch.dataprepper.model.sink.OutputCodecContext; | ||
import org.opensearch.dataprepper.model.sink.Sink; | ||
import org.opensearch.dataprepper.model.sink.SinkContext; | ||
import org.opensearch.dataprepper.model.types.ByteCount; | ||
import org.opensearch.dataprepper.plugins.lambda.common.LambdaCommonHandler; | ||
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.Buffer; | ||
import org.opensearch.dataprepper.plugins.lambda.common.accumlator.InMemoryBuffer; | ||
import org.opensearch.dataprepper.plugins.lambda.common.client.LambdaClientFactory; | ||
import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions; | ||
import org.opensearch.dataprepper.plugins.lambda.common.util.ThresholdCheck; | ||
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.DlqPushHandler; | ||
import org.opensearch.dataprepper.plugins.lambda.sink.dlq.LambdaSinkFailedDlqData; | ||
import org.opensearch.dataprepper.model.failures.DlqObject; | ||
|
@@ -38,7 +41,7 @@ | |
import java.time.Duration; | ||
import java.util.Collection; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
|
@@ -75,6 +78,13 @@ public class LambdaSink extends AbstractSink<Record<Event>> { | |
private final OutputCodecContext outputCodecContext; | ||
private volatile boolean sinkInitialized; | ||
private DlqPushHandler dlqPushHandler = null; | ||
final int maxEvents; | ||
final long maxBytes; | ||
final Duration maxCollectTime; | ||
|
||
// The partial buffer that may not yet have reached threshold. | ||
// Access must be synchronized | ||
private Buffer statefulBuffer; | ||
|
||
@DataPrepperPluginConstructor | ||
public LambdaSink(final PluginSetting pluginSetting, | ||
|
@@ -90,6 +100,9 @@ public LambdaSink(final PluginSetting pluginSetting, | |
this.lambdaSinkConfig = lambdaSinkConfig; | ||
this.expressionEvaluator = expressionEvaluator; | ||
this.outputCodecContext = OutputCodecContext.fromSinkContext(sinkContext); | ||
this.maxEvents = lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCount(); | ||
this.maxBytes = lambdaSinkConfig.getBatchOptions().getThresholdOptions().getMaximumSize().getBytes(); | ||
this.maxCollectTime = lambdaSinkConfig.getBatchOptions().getThresholdOptions().getEventCollectTimeOut(); | ||
|
||
this.numberOfRecordsSuccessCounter = pluginMetrics.counter( | ||
NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS); | ||
|
@@ -138,57 +151,59 @@ public void doInitialize() { | |
} | ||
|
||
private void doInitializeInternal() { | ||
// Initialize the partial buffer | ||
statefulBuffer = new InMemoryBuffer( | ||
lambdaSinkConfig.getBatchOptions().getKeyName(), | ||
outputCodecContext | ||
); | ||
sinkInitialized = Boolean.TRUE; | ||
} | ||
|
||
/** | ||
* @param records Records to be output | ||
* We only flush the partial buffer if we're shutting down or if we want to | ||
* do a time-based flush. | ||
*/ | ||
@Override | ||
public void doOutput(final Collection<Record<Event>> records) { | ||
public synchronized void shutdown() { | ||
// Flush the partial buffer if any leftover | ||
if (statefulBuffer.getEventCount() > 0) { | ||
flushBuffers(Collections.singletonList(statefulBuffer)); | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized void doOutput(final Collection<Record<Event>> records) { | ||
if (!sinkInitialized) { | ||
LOG.warn("LambdaSink doOutput called before initialization"); | ||
return; | ||
} | ||
if (records.isEmpty()) { | ||
return; | ||
} | ||
|
||
Map<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap = new HashMap<>(); | ||
try { | ||
//Result from lambda is not currently processes. | ||
bufferToFutureMap = LambdaCommonHandler.sendRecords( | ||
records, | ||
lambdaSinkConfig, | ||
lambdaAsyncClient, | ||
outputCodecContext); | ||
} catch (Exception e) { | ||
LOG.error("Exception while processing records ", e); | ||
handleFailure(records, e, HttpURLConnection.HTTP_BAD_REQUEST); | ||
} | ||
// We'll collect any "full" buffers in a local list, flush them at the end | ||
List<Buffer> fullBuffers = new ArrayList<>(); | ||
|
||
for (Map.Entry<Buffer, CompletableFuture<InvokeResponse>> entry : bufferToFutureMap.entrySet()) { | ||
CompletableFuture<InvokeResponse> future = entry.getValue(); | ||
Buffer inputBuffer = entry.getKey(); | ||
try { | ||
InvokeResponse response = future.join(); | ||
Duration latency = inputBuffer.stopLatencyWatch(); | ||
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS); | ||
requestPayloadMetric.record(inputBuffer.getPayloadRequestSize()); | ||
if (!isSuccess(response)) { | ||
String errorMessage = String.format("Lambda invoke failed with status code %s error %s ", | ||
response.statusCode(), response.payload().asUtf8String()); | ||
throw new RuntimeException(errorMessage); | ||
} | ||
|
||
releaseEventHandles(inputBuffer.getRecords(), true); | ||
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount()); | ||
numberOfRequestsSuccessCounter.increment(); | ||
if (response.payload() != null) { | ||
responsePayloadMetric.record(response.payload().asByteArray().length); | ||
} | ||
// Add to the persistent buffer, check threshold | ||
for (Record<Event> record : records) { | ||
//statefulBuffer is either empty or partially filled(from previous run) | ||
statefulBuffer.addRecord(record); | ||
|
||
} catch (Exception e) { | ||
LOG.error(NOISY, e.getMessage(), e); | ||
handleFailure(inputBuffer.getRecords(), new RuntimeException("failed"), HttpURLConnection.HTTP_INTERNAL_ERROR); | ||
if (isThresholdExceeded(statefulBuffer)) { | ||
// This buffer is full | ||
fullBuffers.add(statefulBuffer); | ||
// Create new partial buffer | ||
statefulBuffer = new InMemoryBuffer( | ||
lambdaSinkConfig.getBatchOptions().getKeyName(), | ||
outputCodecContext | ||
); | ||
} | ||
} | ||
|
||
// Flush any full buffers | ||
if (!fullBuffers.isEmpty()) { | ||
flushBuffers(fullBuffers); | ||
} | ||
} | ||
|
||
|
||
|
@@ -210,7 +225,7 @@ private DlqObject createDlqObjectFromEvent(final Event event, | |
.build(); | ||
} | ||
|
||
void handleFailure(Collection<Record<Event>> failedRecords, Throwable throwable, int statusCode) { | ||
synchronized void handleFailure(Collection<Record<Event>> failedRecords, Throwable throwable, int statusCode) { | ||
if (failedRecords.isEmpty()) { | ||
return; | ||
} | ||
|
@@ -249,4 +264,65 @@ private void releaseEventHandles(Collection<Record<Event>> records, boolean succ | |
} | ||
} | ||
} | ||
|
||
private synchronized void flushBuffers(final List<Buffer> buffersToFlush) { | ||
// Combine all their records for a single call to sendRecords | ||
List<Record<Event>> combinedRecords = new ArrayList<>(); | ||
for (Buffer buf : buffersToFlush) { | ||
combinedRecords.addAll(buf.getRecords()); | ||
} | ||
Comment on lines
+271
to
+273
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may be not clear on the overall approach here but Why are we combining the buffers here? also, may be not a big deal but we are creating additional copy of the payload here. |
||
|
||
Map<Buffer, CompletableFuture<InvokeResponse>> bufferToFutureMap; | ||
try { | ||
bufferToFutureMap = LambdaCommonHandler.sendRecords( | ||
combinedRecords, | ||
lambdaSinkConfig, | ||
lambdaAsyncClient, | ||
outputCodecContext | ||
); | ||
} catch (Exception e) { | ||
LOG.error(NOISY, "Error sending buffers to Lambda", e); | ||
handleFailure(combinedRecords, e, HttpURLConnection.HTTP_INTERNAL_ERROR); | ||
return; | ||
} | ||
|
||
for (Map.Entry<Buffer, CompletableFuture<InvokeResponse>> entry : bufferToFutureMap.entrySet()) { | ||
Buffer inputBuffer = entry.getKey(); | ||
CompletableFuture<InvokeResponse> future = entry.getValue(); | ||
|
||
try { | ||
InvokeResponse response = future.join(); | ||
Duration latency = inputBuffer.stopLatencyWatch(); | ||
lambdaLatencyMetric.record(latency.toMillis(), TimeUnit.MILLISECONDS); | ||
requestPayloadMetric.record(inputBuffer.getPayloadRequestSize()); | ||
if (!isSuccess(response)) { | ||
String errorMsg = String.format( | ||
"Lambda invoke failed with code %d, error: %s", | ||
response.statusCode(), | ||
response.payload() != null ? response.payload().asUtf8String() : "No payload" | ||
); | ||
throw new RuntimeException(errorMsg); | ||
} | ||
|
||
releaseEventHandles(inputBuffer.getRecords(), true); | ||
numberOfRecordsSuccessCounter.increment(inputBuffer.getEventCount()); | ||
numberOfRequestsSuccessCounter.increment(); | ||
if (response.payload() != null) { | ||
responsePayloadMetric.record(response.payload().asByteArray().length); | ||
} | ||
} catch (Exception ex) { | ||
LOG.error(NOISY, "Error handling future response from Lambda", ex); | ||
handleFailure(inputBuffer.getRecords(), ex, HttpURLConnection.HTTP_INTERNAL_ERROR); | ||
} | ||
} | ||
} | ||
|
||
private boolean isThresholdExceeded(Buffer buffer) { | ||
return ThresholdCheck.checkThresholdExceed( | ||
buffer, | ||
maxEvents, | ||
ByteCount.ofBytes(maxBytes), | ||
maxCollectTime | ||
); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should handle concurrent actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, all i have used "synchronized" on the methods where this is used. In s3 sink, they use reentrant lock to handle something similar.