Skip to content

Commit

Permalink
Add log record size estimation and limit buffer size in bytes
Browse files Browse the repository at this point in the history
Remove pooling for now
  • Loading branch information
evgenyfedorov2 committed Jan 13, 2025
1 parent 9d13ab0 commit fa52316
Show file tree
Hide file tree
Showing 19 changed files with 265 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Extensions.Diagnostics.Buffering;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;
using Microsoft.Shared.Diagnostics;
using Microsoft.Shared.Pools;
using static Microsoft.Extensions.Logging.ExtendedLogger;

namespace Microsoft.AspNetCore.Diagnostics.Buffering;
Expand All @@ -22,10 +21,9 @@ internal sealed class HttpRequestBuffer : ILoggingBuffer
private readonly ConcurrentQueue<SerializedLogRecord> _buffer;
private readonly TimeProvider _timeProvider = TimeProvider.System;
private readonly IBufferedLogger _bufferedLogger;
private readonly object _bufferCapacityLocker = new();
private readonly ObjectPool<List<PooledLogRecord>> _logRecordPool = PoolFactory.CreateListPool<PooledLogRecord>();
private DateTimeOffset _truncateAfter;

private DateTimeOffset _lastFlushTimestamp;
private int _bufferSize;

public HttpRequestBuffer(IBufferedLogger bufferedLogger,
IOptionsMonitor<HttpRequestBufferOptions> options,
Expand All @@ -35,8 +33,6 @@ public HttpRequestBuffer(IBufferedLogger bufferedLogger,
_globalOptions = globalOptions;
_bufferedLogger = bufferedLogger;
_buffer = new ConcurrentQueue<SerializedLogRecord>();

_truncateAfter = _timeProvider.GetUtcNow();
}

public bool TryEnqueue<TState>(
Expand All @@ -52,66 +48,57 @@ public bool TryEnqueue<TState>(
return false;
}

switch (attributes)
SerializedLogRecord serializedLogRecord = default;
if (attributes is ModernTagJoiner modernTagJoiner)
{
case ModernTagJoiner modernTagJoiner:
_buffer.Enqueue(new SerializedLogRecord(logLevel, eventId, _timeProvider.GetUtcNow(), modernTagJoiner, exception,
((Func<ModernTagJoiner, Exception?, string>)(object)formatter)(modernTagJoiner, exception)));
break;
case LegacyTagJoiner legacyTagJoiner:
_buffer.Enqueue(new SerializedLogRecord(logLevel, eventId, _timeProvider.GetUtcNow(), legacyTagJoiner, exception,
((Func<LegacyTagJoiner, Exception?, string>)(object)formatter)(legacyTagJoiner, exception)));
break;
default:
Throw.ArgumentException(nameof(attributes), $"Unsupported type of the log attributes object detected: {typeof(TState)}");
break;
serializedLogRecord = new SerializedLogRecord(logLevel, eventId, _timeProvider.GetUtcNow(), modernTagJoiner, exception,
((Func<ModernTagJoiner, Exception?, string>)(object)formatter)(modernTagJoiner, exception));
}
else if (attributes is LegacyTagJoiner legacyTagJoiner)
{
serializedLogRecord = new SerializedLogRecord(logLevel, eventId, _timeProvider.GetUtcNow(), legacyTagJoiner, exception,
((Func<LegacyTagJoiner, Exception?, string>)(object)formatter)(legacyTagJoiner, exception));
}
else
{
Throw.ArgumentException(nameof(attributes), $"Unsupported type of the log attributes object detected: {typeof(TState)}");
}

var now = _timeProvider.GetUtcNow();
lock (_bufferCapacityLocker)
if (serializedLogRecord.SizeInBytes > _globalOptions.CurrentValue.LogRecordSizeInBytes)
{
if (now >= _truncateAfter)
{
_truncateAfter = now.Add(_options.CurrentValue.PerRequestDuration);
TruncateOverlimit();
}
return false;
}

_buffer.Enqueue(serializedLogRecord);
_ = Interlocked.Add(ref _bufferSize, serializedLogRecord.SizeInBytes);

Trim();

return true;
}

public void Flush()
{
var result = _buffer.ToArray();
_buffer.Clear();

_lastFlushTimestamp = _timeProvider.GetUtcNow();

List<PooledLogRecord>? pooledList = null;
try
{
pooledList = _logRecordPool.Get();
foreach (var serializedRecord in result)
{
pooledList.Add(
new PooledLogRecord(
serializedRecord.Timestamp,
serializedRecord.LogLevel,
serializedRecord.EventId,
serializedRecord.Exception,
serializedRecord.FormattedMessage,
serializedRecord.Attributes));
}

_bufferedLogger.LogRecords(pooledList);
}
finally
SerializedLogRecord[] bufferedRecords = _buffer.ToArray();

_buffer.Clear();

var deserializedLogRecords = new List<DeserializedLogRecord>(bufferedRecords.Length);
foreach (var bufferedRecord in bufferedRecords)
{
if (pooledList is not null)
{
_logRecordPool.Return(pooledList);
}
deserializedLogRecords.Add(
new DeserializedLogRecord(
bufferedRecord.Timestamp,
bufferedRecord.LogLevel,
bufferedRecord.EventId,
bufferedRecord.Exception,
bufferedRecord.FormattedMessage,
bufferedRecord.Attributes));
}

_bufferedLogger.LogRecords(deserializedLogRecords);
}

public bool IsEnabled(string category, LogLevel logLevel, EventId eventId)
Expand All @@ -126,12 +113,11 @@ public bool IsEnabled(string category, LogLevel logLevel, EventId eventId)
return rule is not null;
}

public void TruncateOverlimit()
private void Trim()
{
// Capacity is a soft limit, which might be exceeded, esp. in multi-threaded environments.
while (_buffer.Count > _options.CurrentValue.PerRequestCapacity)
while (_bufferSize > _options.CurrentValue.PerRequestBufferSizeInBytes && _buffer.TryDequeue(out var item))
{
_ = _buffer.TryDequeue(out _);
_ = Interlocked.Add(ref _bufferSize, -item.SizeInBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public static ILoggingBuilder AddHttpRequestBuffering(this ILoggingBuilder build
return builder
.AddHttpRequestBufferConfiguration(configuration)
.AddHttpRequestBufferManager()
.AddGlobalBufferConfiguration(configuration)
.AddGlobalBufferManager();
.AddGlobalBuffer(configuration);
}

/// <summary>
Expand All @@ -61,8 +60,7 @@ public static ILoggingBuilder AddHttpRequestBuffering(this ILoggingBuilder build

return builder
.AddHttpRequestBufferManager()
.AddGlobalBuffer(level)
.AddGlobalBufferManager();
.AddGlobalBuffer(level);
}

/// <summary>
Expand All @@ -75,11 +73,9 @@ internal static ILoggingBuilder AddHttpRequestBufferManager(this ILoggingBuilder
{
_ = Throw.IfNull(builder);

builder.Services.TryAddSingleton<IHttpContextAccessor, HttpContextAccessor>();

builder.Services.TryAddSingleton<ExtendedLoggerFactory>();
builder.Services.TryAddEnumerable(ServiceDescriptor.Singleton<ILoggerFactory, ExtendedLoggerFactory>(sp => sp.GetRequiredService<ExtendedLoggerFactory>()));
_ = builder.Services.AddExtendedLoggerFeactory();

builder.Services.TryAddSingleton<IHttpContextAccessor, HttpContextAccessor>();
builder.Services.TryAddSingleton<HttpRequestBufferManager>();
builder.Services.TryAddSingleton<IBufferManager>(static sp => sp.GetRequiredService<HttpRequestBufferManager>());
builder.Services.TryAddSingleton<IHttpRequestBufferManager>(static sp => sp.GetRequiredService<HttpRequestBufferManager>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ namespace Microsoft.AspNetCore.Diagnostics.Buffering;

internal sealed class HttpRequestBufferManager : IHttpRequestBufferManager
{
private readonly GlobalBufferManager _globalBufferManager;
private readonly IGlobalBufferManager _globalBufferManager;
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IOptionsMonitor<HttpRequestBufferOptions> _requestOptions;
private readonly IOptionsMonitor<GlobalBufferOptions> _globalOptions;

public HttpRequestBufferManager(
GlobalBufferManager globalBufferManager,
IGlobalBufferManager globalBufferManager,
IHttpContextAccessor httpContextAccessor,
IOptionsMonitor<HttpRequestBufferOptions> requestOptions,
IOptionsMonitor<GlobalBufferOptions> globalOptions)
Expand All @@ -29,29 +29,6 @@ public HttpRequestBufferManager(
_globalOptions = globalOptions;
}

public ILoggingBuffer CreateBuffer(IBufferedLogger bufferedLogger, string category)
{
var httpContext = _httpContextAccessor.HttpContext;
if (httpContext is null)
{
return _globalBufferManager.CreateBuffer(bufferedLogger, category);
}

if (!httpContext.Items.TryGetValue(category, out var buffer))
{
var httpRequestBuffer = new HttpRequestBuffer(bufferedLogger, _requestOptions, _globalOptions);
httpContext.Items[category] = httpRequestBuffer;
return httpRequestBuffer;
}

if (buffer is not ILoggingBuffer loggingBuffer)
{
throw new InvalidOperationException($"Unable to parse value of {buffer} of the {category}");
}

return loggingBuffer;
}

public void FlushNonRequestLogs() => _globalBufferManager.Flush();

public void FlushCurrentRequestLogs()
Expand All @@ -77,7 +54,24 @@ public bool TryEnqueue<TState>(
Exception? exception,
Func<TState, Exception?, string> formatter)
{
var buffer = CreateBuffer(bufferedLogger, category);
return buffer.TryEnqueue(logLevel, category, eventId, attributes, exception, formatter);
var httpContext = _httpContextAccessor.HttpContext;
if (httpContext is null)
{
return _globalBufferManager.TryEnqueue(bufferedLogger, logLevel, category, eventId, attributes, exception, formatter);
}

if (!httpContext.Items.TryGetValue(category, out var buffer))
{
var httpRequestBuffer = new HttpRequestBuffer(bufferedLogger, _requestOptions, _globalOptions);
httpContext.Items[category] = httpRequestBuffer;
buffer = httpRequestBuffer;
}

if (buffer is not ILoggingBuffer loggingBuffer)
{
throw new InvalidOperationException($"Unable to parse value of {buffer} of the {category}");
}

return loggingBuffer.TryEnqueue(logLevel, category, eventId, attributes, exception, formatter);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Diagnostics.Buffering;
Expand All @@ -16,14 +15,10 @@ namespace Microsoft.AspNetCore.Diagnostics.Buffering;
public class HttpRequestBufferOptions
{
/// <summary>
/// Gets or sets the duration to check and remove the buffered items exceeding the <see cref="PerRequestCapacity"/>.
/// Gets or sets the size in bytes of the buffer for a request. If the buffer size exceeds this limit, the oldest buffered log records will be dropped.
/// </summary>
public TimeSpan PerRequestDuration { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Gets or sets the size of the buffer for a request.
/// </summary>
public int PerRequestCapacity { get; set; } = 1_000;
/// TO DO: add validation.
public int PerRequestBufferSizeInBytes { get; set; } = 5_000_000;

#pragma warning disable CA1002 // Do not expose generic lists - List is necessary to be able to call .AddRange()
#pragma warning disable CA2227 // Collection properties should be read only - setter is necessary for options pattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,29 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.ObjectPool;
using Microsoft.Shared.DiagnosticIds;

namespace Microsoft.Extensions.Diagnostics.Buffering;
internal sealed class PooledLogRecord : BufferedLogRecord, IResettable

/// <summary>
/// Represents a log record deserialized from somewhere, such as buffer.
/// </summary>
[Experimental(diagnosticId: DiagnosticIds.Experiments.Telemetry, UrlFormat = DiagnosticIds.UrlFormat)]
public sealed class DeserializedLogRecord : BufferedLogRecord
{
public PooledLogRecord(
/// <summary>
/// Initializes a new instance of the <see cref="DeserializedLogRecord"/> class.
/// </summary>
/// <param name="timestamp">The time when the log record was first created.</param>
/// <param name="logLevel">Logging severity level.</param>
/// <param name="eventId">Event ID.</param>
/// <param name="exception">An exception string for this record.</param>
/// <param name="formattedMessage">The formatted log message.</param>
/// <param name="attributes">The set of name/value pairs associated with the record.</param>
public DeserializedLogRecord(
DateTimeOffset timestamp,
LogLevel logLevel,
EventId eventId,
Expand All @@ -27,49 +41,27 @@ public PooledLogRecord(
_attributes = attributes;
}

/// <inheritdoc/>
public override DateTimeOffset Timestamp => _timestamp;
private DateTimeOffset _timestamp;

/// <inheritdoc/>
public override LogLevel LogLevel => _logLevel;
private LogLevel _logLevel;

/// <inheritdoc/>
public override EventId EventId => _eventId;
private EventId _eventId;

/// <inheritdoc/>
public override string? Exception => _exception;
private string? _exception;

public override ActivitySpanId? ActivitySpanId => _activitySpanId;
private ActivitySpanId? _activitySpanId;

public override ActivityTraceId? ActivityTraceId => _activityTraceId;
private ActivityTraceId? _activityTraceId;

public override int? ManagedThreadId => _managedThreadId;
private int? _managedThreadId;

/// <inheritdoc/>
public override string? FormattedMessage => _formattedMessage;
private string? _formattedMessage;

public override string? MessageTemplate => _messageTemplate;
private string? _messageTemplate;

/// <inheritdoc/>
public override IReadOnlyList<KeyValuePair<string, object?>> Attributes => _attributes;
private IReadOnlyList<KeyValuePair<string, object?>> _attributes;

public bool TryReset()
{
_timestamp = default;
_logLevel = default;
_eventId = default;
_exception = default;
_activitySpanId = default;
_activityTraceId = default;
_managedThreadId = default;
_formattedMessage = default;
_messageTemplate = default;
_attributes = [];

return true;
}
}
Loading

0 comments on commit fa52316

Please sign in to comment.