diff --git a/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs b/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs
index 4fd5b27..cfef9a1 100644
--- a/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs
+++ b/src/Serilog.Sinks.AwsCloudWatch/AwsCloudWatchConfigurationExtension.cs
@@ -7,6 +7,7 @@
using Serilog.Formatting;
using Serilog.Formatting.Json;
using Serilog.Sinks.AwsCloudWatch.LogStreamNameProvider;
+using Serilog.Sinks.PeriodicBatching;
namespace Serilog.Sinks.AwsCloudWatch
{
@@ -37,9 +38,16 @@ public static LoggerConfiguration AmazonCloudWatch(this LoggerSinkConfiguration
{
throw new ArgumentNullException(nameof(cloudWatchClient));
}
+
+ // the batched sink is
+ var batchedSink = new PeriodicBatchingSinkImplementationCallback(cloudWatchClient, options);
- // create the sink
- var sink = new CloudWatchLogSink(cloudWatchClient, options);
+ var sink = new PeriodicBatchingSink(batchedSink, new()
+ {
+ BatchSizeLimit = options.BatchSizeLimit,
+ Period = options.Period,
+ QueueLimit = options.QueueSizeLimit
+ });
// register the sink
return loggerConfiguration.Sink(sink, options.MinimumLogEventLevel);
@@ -180,10 +188,8 @@ public static LoggerConfiguration AmazonCloudWatch(
}
var client = cloudWatchClient ?? new AmazonCloudWatchLogsClient();
-
- // Create and register the sink
- var sink = new CloudWatchLogSink(client, options);
- return loggerConfiguration.Sink(sink, options.MinimumLogEventLevel);
+
+ return AmazonCloudWatch(loggerConfiguration, options, client);
}
}
}
diff --git a/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs b/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs
index 4494b99..2b1b390 100644
--- a/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs
+++ b/src/Serilog.Sinks.AwsCloudWatch/CloudWatchLogSink.cs
@@ -1,14 +1,7 @@
using Amazon.CloudWatchLogs;
-using Amazon.CloudWatchLogs.Model;
using Serilog.Events;
-using Serilog.Formatting;
using Serilog.Sinks.PeriodicBatching;
using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading;
using System.Threading.Tasks;
using Serilog.Core;
@@ -17,9 +10,10 @@ namespace Serilog.Sinks.AwsCloudWatch
///
/// A Serilog log sink that publishes to AWS CloudWatch Logs
///
- ///
- public class CloudWatchLogSink : ILogEventSink, IBatchedLogEventSink, IDisposable, IAsyncDisposable
+ public class CloudWatchLogSink : ILogEventSink, IDisposable, IAsyncDisposable
{
+ private readonly PeriodicBatchingSink batchingSink;
+
///
/// The maximum log event size = 256 KB - 26 B
///
@@ -50,17 +44,6 @@ public class CloudWatchLogSink : ILogEventSink, IBatchedLogEventSink, IDisposabl
///
public static readonly TimeSpan ErrorBackoffStartingInterval = TimeSpan.FromMilliseconds(100);
- private readonly IAmazonCloudWatchLogs cloudWatchClient;
- private readonly ICloudWatchSinkOptions options;
- private bool hasInit;
- private string logStreamName;
- private string nextSequenceToken;
- private readonly ITextFormatter textFormatter;
- private readonly PeriodicBatchingSink batchingSink;
-
- private readonly SemaphoreSlim syncObject = new SemaphoreSlim(1);
-
-#pragma warning disable CS0618
///
/// Initializes a new instance of the class.
///
@@ -68,363 +51,10 @@ public class CloudWatchLogSink : ILogEventSink, IBatchedLogEventSink, IDisposabl
/// The options.
public CloudWatchLogSink(IAmazonCloudWatchLogs cloudWatchClient, ICloudWatchSinkOptions options)
{
- if (string.IsNullOrEmpty(options?.LogGroupName))
- {
- throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.LogGroupName)} must be specified.");
- }
- if (options.BatchSizeLimit < 1)
- {
- throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.BatchSizeLimit)} must be a value greater than 0.");
- }
- this.cloudWatchClient = cloudWatchClient;
- this.options = options;
-
- if (options.TextFormatter == null)
- {
- throw new ArgumentException($"{nameof(options.TextFormatter)} is required");
- }
-
- textFormatter = options.TextFormatter;
- batchingSink = new(this, new() { BatchSizeLimit = options.BatchSizeLimit, Period = options.Period, QueueLimit = options.QueueSizeLimit });
- }
-#pragma warning restore CS0618
-
- ///
- /// Ensures the component is initialized.
- ///
- private async Task EnsureInitializedAsync()
- {
- if (hasInit)
- {
- return;
- }
-
- // create log group
- await CreateLogGroupAsync();
-
- // create log stream
- UpdateLogStreamName();
- await CreateLogStreamAsync();
-
- hasInit = true;
- }
-
- ///
- /// Creates the log group.
- ///
- private async Task CreateLogGroupAsync()
- {
- if (options.CreateLogGroup)
- {
- // see if the log group already exists
- var describeRequest = new DescribeLogGroupsRequest
- {
- LogGroupNamePrefix = options.LogGroupName
- };
-
- var logGroups = await cloudWatchClient
- .DescribeLogGroupsAsync(describeRequest);
-
- var logGroup = logGroups
- .LogGroups
- .FirstOrDefault(lg => string.Equals(lg.LogGroupName, options.LogGroupName, StringComparison.Ordinal));
-
- // create log group if it doesn't exist
- if (logGroup == null)
- {
- var createRequest = new CreateLogGroupRequest(options.LogGroupName);
- var createResponse = await cloudWatchClient.CreateLogGroupAsync(createRequest);
-
- // update the retention policy if a specific period is defined
- if (options.LogGroupRetentionPolicy != LogGroupRetentionPolicy.Indefinitely)
- {
- var putRetentionRequest = new PutRetentionPolicyRequest(options.LogGroupName, (int)options.LogGroupRetentionPolicy);
- await cloudWatchClient.PutRetentionPolicyAsync(putRetentionRequest);
- }
- }
- }
- }
-
- ///
- /// Updates the name of the log stream.
- ///
- private void UpdateLogStreamName()
- {
- logStreamName = options.LogStreamNameProvider.GetLogStreamName();
- nextSequenceToken = null; // always reset on a new stream
- }
-
- ///
- /// Creates the log stream if needed.
- ///
- private async Task CreateLogStreamAsync()
- {
- // see if the log stream already exists
- var logStream = await GetLogStreamAsync();
-
- // create log stream if it doesn't exist
- if (logStream == null)
- {
- var createLogStreamRequest = new CreateLogStreamRequest
- {
- LogGroupName = options.LogGroupName,
- LogStreamName = logStreamName
- };
- var createLogStreamResponse = await cloudWatchClient.CreateLogStreamAsync(createLogStreamRequest);
- }
- else
- {
- nextSequenceToken = logStream.UploadSequenceToken;
- }
- }
-
- ///
- /// Updates the log stream sequence token.
- ///
- private async Task UpdateLogStreamSequenceTokenAsync()
- {
- var logStream = await GetLogStreamAsync();
- nextSequenceToken = logStream?.UploadSequenceToken;
- }
-
- ///
- /// Attempts to get the log stream defined by .
- ///
- /// The matching log stream or null if no match can be found.
- private async Task GetLogStreamAsync()
- {
- var describeLogStreamsRequest = new DescribeLogStreamsRequest
- {
- LogGroupName = options.LogGroupName,
- LogStreamNamePrefix = logStreamName
- };
-
- var describeLogStreamsResponse = await cloudWatchClient
- .DescribeLogStreamsAsync(describeLogStreamsRequest);
-
- return describeLogStreamsResponse
- .LogStreams
- .SingleOrDefault(ls => string.Equals(ls.LogStreamName, logStreamName, StringComparison.Ordinal));
+ var batchedSink = new PeriodicBatchingSinkImplementationCallback(cloudWatchClient, options);
+ batchingSink = new(batchedSink, new() { BatchSizeLimit = options.BatchSizeLimit, Period = options.Period, QueueLimit = options.QueueSizeLimit });
}
-
- ///
- /// Creates a batch of events.
- ///
- /// The entire set of log events.
- /// A batch of events meeting defined restrictions.
- private List CreateBatch(Queue logEvents)
- {
- DateTime? first = null;
- var batchSize = 0;
- var batch = new List();
-
- while (batch.Count < MaxLogEventBatchCount && logEvents.Count > 0) // ensure < max batch count
- {
- var @event = logEvents.Peek();
-
- if (!first.HasValue)
- {
- first = @event.Timestamp;
- }
- else if (@event.Timestamp.Subtract(first.Value) > MaxBatchEventSpan) // ensure batch spans no more than 24 hours
- {
- break;
- }
-
- var proposedBatchSize = batchSize + System.Text.Encoding.UTF8.GetByteCount(@event.Message) + MessageBufferSize;
- if (proposedBatchSize < MaxLogEventBatchSize) // ensure < max batch size
- {
- batchSize = proposedBatchSize;
- batch.Add(@event);
- logEvents.Dequeue();
- }
- else
- {
- break;
- }
- }
-
- return batch;
- }
-
- ///
- /// Publish the batch of log events to AWS CloudWatch Logs.
- ///
- /// The request.
- private async Task PublishBatchAsync(List batch)
- {
- if (batch?.Count == 0)
- {
- return;
- }
-
- var success = false;
- var attemptIndex = 0;
- while (!success && attemptIndex <= options.RetryAttempts)
- {
- try
- {
- // creates the request to upload a new event to CloudWatch
- var putLogEventsRequest = new PutLogEventsRequest
- {
- LogGroupName = options.LogGroupName,
- LogStreamName = logStreamName,
- SequenceToken = nextSequenceToken,
- LogEvents = batch
- };
-
- // actually upload the event to CloudWatch
- var putLogEventsResponse = await cloudWatchClient.PutLogEventsAsync(putLogEventsRequest);
-
- // remember the next sequence token, which is required
- nextSequenceToken = putLogEventsResponse.NextSequenceToken;
-
- success = true;
- }
- catch (ServiceUnavailableException e)
- {
- // retry with back-off
- Debugging.SelfLog.WriteLine("Service unavailable. Attempt: {0} Error: {1}", attemptIndex, e);
- await Task.Delay(ErrorBackoffStartingInterval.Milliseconds * (int)Math.Pow(2, attemptIndex));
- attemptIndex++;
- }
- catch (ResourceNotFoundException e)
- {
- // no retry with back-off because..
- // if one of these fails, we get out of the loop.
- // if they're both successful, we don't hit this case again.
- Debugging.SelfLog.WriteLine("Resource was not found. Error: {0}", e);
- await CreateLogGroupAsync();
- await CreateLogStreamAsync();
- }
- catch (DataAlreadyAcceptedException e)
- {
- Debugging.SelfLog.WriteLine("Data already accepted. Attempt: {0} Error: {1}", attemptIndex, e);
- try
- {
- await UpdateLogStreamSequenceTokenAsync();
- }
- catch (Exception ex)
- {
- Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex);
-
- // try again with a different log stream
- UpdateLogStreamName();
- await CreateLogStreamAsync();
- }
- attemptIndex++;
- }
- catch (InvalidSequenceTokenException e)
- {
- Debugging.SelfLog.WriteLine("Invalid sequence token. Attempt: {0} Error: {1}", attemptIndex, e);
- try
- {
- await UpdateLogStreamSequenceTokenAsync();
- }
- catch (Exception ex)
- {
- Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex);
-
- // try again with a different log stream
- UpdateLogStreamName();
- await CreateLogStreamAsync();
- }
- attemptIndex++;
- }
- catch (Exception e)
- {
- Debugging.SelfLog.WriteLine("Unhandled exception. Error: {0}", e);
- break;
- }
- }
- }
-
- ///
- /// Emit a batch of log events, running asynchronously.
- ///
- /// The events to emit.
- async Task IBatchedLogEventSink.EmitBatchAsync(IEnumerable events)
- {
- try
- {
- await syncObject.WaitAsync();
-
- if (events?.Count() == 0)
- {
- return;
- }
-
- try
- {
- await EnsureInitializedAsync();
- }
- catch (Exception ex)
- {
- Debugging.SelfLog.WriteLine("Error initializing log stream. No logs will be sent to AWS CloudWatch. Exception was {0}.", ex);
- return;
- }
-
- try
- {
- var logEvents =
- new Queue(events
- .OrderBy(e => e.Timestamp) // log events need to be ordered by timestamp within a single bulk upload to CloudWatch
- .Select( // transform
- @event =>
- {
- string message = null;
- using (var writer = new StringWriter())
- {
- textFormatter.Format(@event, writer);
- writer.Flush();
- message = writer.ToString();
- }
- var messageLength = Encoding.UTF8.GetByteCount(message);
- if (messageLength > MaxLogEventSize)
- {
- // truncate event message
- Debugging.SelfLog.WriteLine("Truncating log event with length of {0}", messageLength);
- var buffer = Encoding.UTF8.GetBytes(message);
- message = Encoding.UTF8.GetString(buffer, 0, MaxLogEventSize);
- }
- return new InputLogEvent
- {
- Message = message,
- Timestamp = @event.Timestamp.UtcDateTime
- };
- }));
-
- while (logEvents.Count > 0)
- {
- var batch = CreateBatch(logEvents);
-
- await PublishBatchAsync(batch);
- }
- }
- catch (Exception ex)
- {
- try
- {
- Debugging.SelfLog.WriteLine("Error sending logs. No logs will be sent to AWS CloudWatch. Error was {0}", ex);
- }
- catch
- {
- // we even failed to log to the trace logger - giving up trying to put something out
- }
- }
- }
- finally
- {
- syncObject.Release();
- }
- }
-
- ///
- Task IBatchedLogEventSink.OnEmptyBatchAsync()
- {
- return Task.CompletedTask;
- }
-
+
///
public void Emit(LogEvent logEvent)
{
diff --git a/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs b/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs
new file mode 100644
index 0000000..effcc5e
--- /dev/null
+++ b/src/Serilog.Sinks.AwsCloudWatch/PeriodicBatchingSinkImplementationCallback.cs
@@ -0,0 +1,383 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Amazon.CloudWatchLogs;
+using Amazon.CloudWatchLogs.Model;
+using Serilog.Events;
+using Serilog.Formatting;
+using Serilog.Sinks.PeriodicBatching;
+
+namespace Serilog.Sinks.AwsCloudWatch;
+
+internal class PeriodicBatchingSinkImplementationCallback: IBatchedLogEventSink
+{
+ private readonly IAmazonCloudWatchLogs cloudWatchClient;
+ private readonly ICloudWatchSinkOptions options;
+ private bool hasInit;
+ private string logStreamName;
+ private string nextSequenceToken;
+ private readonly ITextFormatter textFormatter;
+
+ private readonly SemaphoreSlim syncObject = new SemaphoreSlim(1);
+
+ public PeriodicBatchingSinkImplementationCallback(IAmazonCloudWatchLogs cloudWatchClient, ICloudWatchSinkOptions options)
+ {
+ if (string.IsNullOrEmpty(options?.LogGroupName))
+ {
+ throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.LogGroupName)} must be specified.");
+ }
+ if (options.BatchSizeLimit < 1)
+ {
+ throw new ArgumentException($"{nameof(ICloudWatchSinkOptions)}.{nameof(options.BatchSizeLimit)} must be a value greater than 0.");
+ }
+ this.cloudWatchClient = cloudWatchClient;
+ this.options = options;
+
+ if (options.TextFormatter == null)
+ {
+ throw new ArgumentException($"{nameof(options.TextFormatter)} is required");
+ }
+
+ textFormatter = options.TextFormatter;
+ }
+
+ ///
+ /// Ensures the component is initialized.
+ ///
+ private async Task EnsureInitializedAsync()
+ {
+ if (hasInit)
+ {
+ return;
+ }
+
+ // create log group
+ await CreateLogGroupAsync();
+
+ // create log stream
+ UpdateLogStreamName();
+ await CreateLogStreamAsync();
+
+ hasInit = true;
+ }
+
+ ///
+ /// Creates the log group.
+ ///
+ private async Task CreateLogGroupAsync()
+ {
+ if (options.CreateLogGroup)
+ {
+ // see if the log group already exists
+ var describeRequest = new DescribeLogGroupsRequest
+ {
+ LogGroupNamePrefix = options.LogGroupName
+ };
+
+ var logGroups = await cloudWatchClient
+ .DescribeLogGroupsAsync(describeRequest);
+
+ var logGroup = logGroups
+ .LogGroups
+ .FirstOrDefault(lg => string.Equals(lg.LogGroupName, options.LogGroupName, StringComparison.Ordinal));
+
+ // create log group if it doesn't exist
+ if (logGroup == null)
+ {
+ var createRequest = new CreateLogGroupRequest(options.LogGroupName);
+ var createResponse = await cloudWatchClient.CreateLogGroupAsync(createRequest);
+
+ // update the retention policy if a specific period is defined
+ if (options.LogGroupRetentionPolicy != LogGroupRetentionPolicy.Indefinitely)
+ {
+ var putRetentionRequest = new PutRetentionPolicyRequest(options.LogGroupName, (int)options.LogGroupRetentionPolicy);
+ await cloudWatchClient.PutRetentionPolicyAsync(putRetentionRequest);
+ }
+ }
+ }
+ }
+
+ ///
+ /// Updates the name of the log stream.
+ ///
+ private void UpdateLogStreamName()
+ {
+ logStreamName = options.LogStreamNameProvider.GetLogStreamName();
+ nextSequenceToken = null; // always reset on a new stream
+ }
+
+ ///
+ /// Creates the log stream if needed.
+ ///
+ private async Task CreateLogStreamAsync()
+ {
+ // see if the log stream already exists
+ var logStream = await GetLogStreamAsync();
+
+ // create log stream if it doesn't exist
+ if (logStream == null)
+ {
+ var createLogStreamRequest = new CreateLogStreamRequest
+ {
+ LogGroupName = options.LogGroupName,
+ LogStreamName = logStreamName
+ };
+ var createLogStreamResponse = await cloudWatchClient.CreateLogStreamAsync(createLogStreamRequest);
+ }
+ else
+ {
+ nextSequenceToken = logStream.UploadSequenceToken;
+ }
+ }
+
+ ///
+ /// Updates the log stream sequence token.
+ ///
+ private async Task UpdateLogStreamSequenceTokenAsync()
+ {
+ var logStream = await GetLogStreamAsync();
+ nextSequenceToken = logStream?.UploadSequenceToken;
+ }
+
+ ///
+ /// Attempts to get the log stream defined by .
+ ///
+ /// The matching log stream or null if no match can be found.
+ private async Task GetLogStreamAsync()
+ {
+ var describeLogStreamsRequest = new DescribeLogStreamsRequest
+ {
+ LogGroupName = options.LogGroupName,
+ LogStreamNamePrefix = logStreamName
+ };
+
+ var describeLogStreamsResponse = await cloudWatchClient
+ .DescribeLogStreamsAsync(describeLogStreamsRequest);
+
+ return describeLogStreamsResponse
+ .LogStreams
+ .SingleOrDefault(ls => string.Equals(ls.LogStreamName, logStreamName, StringComparison.Ordinal));
+ }
+
+ ///
+ /// Creates a batch of events.
+ ///
+ /// The entire set of log events.
+ /// A batch of events meeting defined restrictions.
+ private List CreateBatch(Queue logEvents)
+ {
+ DateTime? first = null;
+ var batchSize = 0;
+ var batch = new List();
+
+ while (batch.Count < CloudWatchLogSink.MaxLogEventBatchCount && logEvents.Count > 0) // ensure < max batch count
+ {
+ var @event = logEvents.Peek();
+
+ if (!first.HasValue)
+ {
+ first = @event.Timestamp;
+ }
+ else if (@event.Timestamp.Subtract(first.Value) > CloudWatchLogSink.MaxBatchEventSpan) // ensure batch spans no more than 24 hours
+ {
+ break;
+ }
+
+ var proposedBatchSize = batchSize + System.Text.Encoding.UTF8.GetByteCount(@event.Message) + CloudWatchLogSink.MessageBufferSize;
+ if (proposedBatchSize < CloudWatchLogSink.MaxLogEventBatchSize) // ensure < max batch size
+ {
+ batchSize = proposedBatchSize;
+ batch.Add(@event);
+ logEvents.Dequeue();
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return batch;
+ }
+
+ ///
+ /// Publish the batch of log events to AWS CloudWatch Logs.
+ ///
+ /// The request.
+ private async Task PublishBatchAsync(List batch)
+ {
+ if (batch?.Count == 0)
+ {
+ return;
+ }
+
+ var success = false;
+ var attemptIndex = 0;
+ while (!success && attemptIndex <= options.RetryAttempts)
+ {
+ try
+ {
+ // creates the request to upload a new event to CloudWatch
+ var putLogEventsRequest = new PutLogEventsRequest
+ {
+ LogGroupName = options.LogGroupName,
+ LogStreamName = logStreamName,
+ SequenceToken = nextSequenceToken,
+ LogEvents = batch
+ };
+
+ // actually upload the event to CloudWatch
+ var putLogEventsResponse = await cloudWatchClient.PutLogEventsAsync(putLogEventsRequest);
+
+ // remember the next sequence token, which is required
+ nextSequenceToken = putLogEventsResponse.NextSequenceToken;
+
+ success = true;
+ }
+ catch (ServiceUnavailableException e)
+ {
+ // retry with back-off
+ Debugging.SelfLog.WriteLine("Service unavailable. Attempt: {0} Error: {1}", attemptIndex, e);
+ await Task.Delay(CloudWatchLogSink.ErrorBackoffStartingInterval.Milliseconds * (int)Math.Pow(2, attemptIndex));
+ attemptIndex++;
+ }
+ catch (ResourceNotFoundException e)
+ {
+ // no retry with back-off because..
+ // if one of these fails, we get out of the loop.
+ // if they're both successful, we don't hit this case again.
+ Debugging.SelfLog.WriteLine("Resource was not found. Error: {0}", e);
+ await CreateLogGroupAsync();
+ await CreateLogStreamAsync();
+ }
+ catch (DataAlreadyAcceptedException e)
+ {
+ Debugging.SelfLog.WriteLine("Data already accepted. Attempt: {0} Error: {1}", attemptIndex, e);
+ try
+ {
+ await UpdateLogStreamSequenceTokenAsync();
+ }
+ catch (Exception ex)
+ {
+ Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex);
+
+ // try again with a different log stream
+ UpdateLogStreamName();
+ await CreateLogStreamAsync();
+ }
+ attemptIndex++;
+ }
+ catch (InvalidSequenceTokenException e)
+ {
+ Debugging.SelfLog.WriteLine("Invalid sequence token. Attempt: {0} Error: {1}", attemptIndex, e);
+ try
+ {
+ await UpdateLogStreamSequenceTokenAsync();
+ }
+ catch (Exception ex)
+ {
+ Debugging.SelfLog.WriteLine("Unable to update log stream sequence. Attempt: {0} Error: {1}", attemptIndex, ex);
+
+ // try again with a different log stream
+ UpdateLogStreamName();
+ await CreateLogStreamAsync();
+ }
+ attemptIndex++;
+ }
+ catch (Exception e)
+ {
+ Debugging.SelfLog.WriteLine("Unhandled exception. Error: {0}", e);
+ break;
+ }
+ }
+ }
+
+ ///
+ /// Emit a batch of log events, running asynchronously.
+ ///
+ /// The events to emit.
+ public async Task EmitBatchAsync(IEnumerable events)
+ {
+ try
+ {
+ await syncObject.WaitAsync();
+
+ if (events?.Count() == 0)
+ {
+ return;
+ }
+
+ try
+ {
+ await EnsureInitializedAsync();
+ }
+ catch (Exception ex)
+ {
+ Debugging.SelfLog.WriteLine("Error initializing log stream. No logs will be sent to AWS CloudWatch. Exception was {0}.", ex);
+ return;
+ }
+
+ try
+ {
+ var logEvents =
+ new Queue(events
+ .OrderBy(e => e.Timestamp) // log events need to be ordered by timestamp within a single bulk upload to CloudWatch
+ .Select( // transform
+ @event =>
+ {
+ string message = null;
+ using (var writer = new StringWriter())
+ {
+ textFormatter.Format(@event, writer);
+ writer.Flush();
+ message = writer.ToString();
+ }
+ var messageLength = Encoding.UTF8.GetByteCount(message);
+ if (messageLength > CloudWatchLogSink.MaxLogEventSize)
+ {
+ // truncate event message
+ Debugging.SelfLog.WriteLine("Truncating log event with length of {0}", messageLength);
+ var buffer = Encoding.UTF8.GetBytes(message);
+ message = Encoding.UTF8.GetString(buffer, 0, CloudWatchLogSink.MaxLogEventSize);
+ }
+ return new InputLogEvent
+ {
+ Message = message,
+ Timestamp = @event.Timestamp.UtcDateTime
+ };
+ }));
+
+ while (logEvents.Count > 0)
+ {
+ var batch = CreateBatch(logEvents);
+
+ await PublishBatchAsync(batch);
+ }
+ }
+ catch (Exception ex)
+ {
+ try
+ {
+ Debugging.SelfLog.WriteLine("Error sending logs. No logs will be sent to AWS CloudWatch. Error was {0}", ex);
+ }
+ catch
+ {
+ // we even failed to log to the trace logger - giving up trying to put something out
+ }
+ }
+ }
+ finally
+ {
+ syncObject.Release();
+ }
+ }
+
+ ///
+ public Task OnEmptyBatchAsync()
+ {
+ return Task.CompletedTask;
+ }
+}
\ No newline at end of file
diff --git a/test/Serilog.Sinks.AwsCloudWatch.Tests/CloudWatchLogsSinkTests.cs b/test/Serilog.Sinks.AwsCloudWatch.Tests/PeriodicBatchedSinkImplementationCallbackTests.cs
similarity index 97%
rename from test/Serilog.Sinks.AwsCloudWatch.Tests/CloudWatchLogsSinkTests.cs
rename to test/Serilog.Sinks.AwsCloudWatch.Tests/PeriodicBatchedSinkImplementationCallbackTests.cs
index 80ab7f4..cd13d19 100644
--- a/test/Serilog.Sinks.AwsCloudWatch.Tests/CloudWatchLogsSinkTests.cs
+++ b/test/Serilog.Sinks.AwsCloudWatch.Tests/PeriodicBatchedSinkImplementationCallbackTests.cs
@@ -15,12 +15,12 @@
namespace Serilog.Sinks.AwsCloudWatch.Tests
{
- public class CloudWatchLogsSinkTests
+ public class PeriodicBatchedSinkImplementationCallbackTests
{
private const string Alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
private static Random random = new Random((int)DateTime.Now.Ticks);
- public CloudWatchLogsSinkTests(ITestOutputHelper output)
+ public PeriodicBatchedSinkImplementationCallbackTests(ITestOutputHelper output)
{
// so we can inspect what will be output to selflog
Debugging.SelfLog.Enable(msg => output.WriteLine(msg));
@@ -36,7 +36,7 @@ public async Task SingleBatch()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -93,7 +93,7 @@ public async Task SingleBatch_LogGroupExists()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { LogGroupName = Guid.NewGuid().ToString(), TextFormatter = textFormatterMock.Object };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -157,7 +157,7 @@ public async Task SingleBatch_WithoutCreatingLogGroup()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { CreateLogGroup = false, TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -208,7 +208,7 @@ public async Task SingleBatch_LogStreamExists()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { LogGroupName = Guid.NewGuid().ToString(), LogStreamNameProvider = new NonUniqueLogStreamNameProvider(), TextFormatter = textFormatterMock.Object };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -280,7 +280,7 @@ public async Task LargeMessage()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var largeEventMessage = CreateMessage(CloudWatchLogSink.MaxLogEventSize + 1);
var events = new LogEvent[]
{
@@ -335,7 +335,7 @@ public async Task MultipleDays()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 20)
.Select(i => // create multipe events with message length of 12
new LogEvent(
@@ -407,7 +407,7 @@ public async Task MoreThanMaxBatchCount()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, CloudWatchLogSink.MaxLogEventBatchCount + 1)
.Select(i =>
new LogEvent(
@@ -474,7 +474,7 @@ public async Task MoreThanMaxBatchSize()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 256) // 256 4 KB messages matches our max batch size, but we want to test a "less nice" scenario, so we'll create 256 5 KB messages
.Select(i =>
new LogEvent(
@@ -541,7 +541,7 @@ public async Task ServiceUnavailable()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -594,7 +594,7 @@ public async Task ServiceUnavailable_WithEventualSuccess()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -636,7 +636,7 @@ public async Task ResourceNotFound()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -686,7 +686,7 @@ public async Task ResourceNotFound_CannotCreateResource()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -732,7 +732,7 @@ public async Task InvalidParameter()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -776,7 +776,7 @@ public async Task InvalidSequenceToken()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -828,7 +828,7 @@ public async Task InvalidSequenceToken_CannotUpdateSequenceToken()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { LogStreamNameProvider = logStreamNameProvider.Object, TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(
@@ -882,7 +882,7 @@ public async Task DataAlreadyAccepted()
var textFormatterMock = new Mock(MockBehavior.Strict);
textFormatterMock.Setup(s => s.Format(It.IsAny(), It.IsAny())).Callback((LogEvent l, TextWriter t) => l.RenderMessage(t));
var options = new CloudWatchSinkOptions { TextFormatter = textFormatterMock.Object, LogGroupName = "Test-Log-Group-Name" };
- var sink = new CloudWatchLogSink(client.Object, options);
+ var sink = new PeriodicBatchingSinkImplementationCallback(client.Object, options);
var events = Enumerable.Range(0, 10)
.Select(_ => // create 10 events with message length of 12
new LogEvent(