Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce calls for creating IServiceScope and rename CreateScope() to CreateServiceScope() #622

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ protected override Task StopCoreAsync(CancellationToken cancellationToken)
Logger.SchedulingNotSupported();
}

using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -111,14 +109,12 @@ protected override Task StopCoreAsync(CancellationToken cancellationToken)
Logger.SchedulingNotSupported();
}

using var scope = CreateScope();
var records = new List<PutRecordsRequestEntry>();

// work on each event
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down
18 changes: 6 additions & 12 deletions src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -159,7 +157,6 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
using var scope = CreateScope();
var sequenceNumbers = new List<string>();

// log warning when trying to publish scheduled message to a topic
Expand All @@ -177,8 +174,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
// work on each event
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand All @@ -199,8 +195,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
var entries = new List<SendMessageBatchRequestEntry>(events.Count);
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -368,10 +363,10 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
else
{
Logger.ReceivedMessages(messagesCount: messages.Count, queueUrl: queueUrl);
using var scope = CreateScope(); // shared
using var scope = CreateServiceScope(); // shared
foreach (var message in messages)
{
await OnMessageReceivedAsync(reg, ecr, queueUrl, message, cancellationToken).ConfigureAwait(false);
await OnMessageReceivedAsync(scope, reg, ecr, queueUrl, message, cancellationToken).ConfigureAwait(false);
}
}
}
Expand All @@ -388,7 +383,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
}
}

private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, string queueUrl, Message message, CancellationToken cancellationToken)
private async Task OnMessageReceivedAsync(IServiceScope scope, EventRegistration reg, EventConsumerRegistration ecr, string queueUrl, Message message, CancellationToken cancellationToken)
{
var messageId = message.MessageId;
message.TryGetAttribute(MetadataNames.CorrelationId, out var correlationId);
Expand Down Expand Up @@ -419,7 +414,6 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe
message.TryGetAttribute("Content-Type", out var contentType_str);
var contentType = contentType_str == null ? null : new ContentType(contentType_str);

using var scope = CreateScope();
var context = await DeserializeAsync(scope: scope,
body: new BinaryData(message.Body),
contentType: contentType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
Logger.ExpiryNotSupported();
}

using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -150,12 +148,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
Logger.ExpiryNotSupported();
}

using var scope = CreateScope();
var eventDatas = new List<EventData>();
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -387,7 +383,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegi
partitionId: partitionId,
partitionKey: data.PartitionKey,
sequenceNumber: data.SequenceNumber);
using var scope = CreateScope();
using var scope = CreateServiceScope(); // shared
var contentType = new ContentType(data.ContentType);
var context = await DeserializeAsync(scope: scope,
body: data.EventBody,
Expand Down Expand Up @@ -421,7 +417,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegi
await dlqProcessor.SendAsync(new[] { data }, cancellationToken).ConfigureAwait(false);
}

/*
/*
* Update the checkpoint store if needed so that the app receives
* only newer events the next time it's run.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -121,15 +119,12 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
// log warning when doing batch
Logger.BatchingNotSupported();

using var scope = CreateScope();

// work on each event
var sequenceNumbers = new List<string>();
var queueClient = await GetQueueClientAsync(reg: registration, deadletter: false, cancellationToken: cancellationToken).ConfigureAwait(false);
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);
// if scheduled for later, calculate the visibility timeout
Expand Down Expand Up @@ -269,10 +264,10 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
else
{
Logger.ReceivedMessages(messagesCount: messages.Length, queueName: queueClient.Name);
using var scope = CreateScope(); // shared
using var scope = CreateServiceScope(); // shared
foreach (var message in messages)
{
await OnMessageReceivedAsync(reg, ecr, queueClient, message, scope, cancellationToken).ConfigureAwait(false);
await OnMessageReceivedAsync(scope, reg, ecr, queueClient, message, cancellationToken).ConfigureAwait(false);
}
}
}
Expand All @@ -289,7 +284,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration
}
}

private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, QueueClient queueClient, QueueMessage message, IServiceScope scope, CancellationToken cancellationToken)
private async Task OnMessageReceivedAsync(IServiceScope scope, EventRegistration reg, EventConsumerRegistration ecr, QueueClient queueClient, QueueMessage message, CancellationToken cancellationToken)
{
var messageId = message.MessageId;
using var log_scope = BeginLoggingScopeForConsume(id: messageId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -181,12 +179,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
DateTimeOffset? scheduled = null,
CancellationToken cancellationToken = default)
{
using var scope = CreateScope();
var messages = new List<ServiceBusMessage>();
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -529,7 +525,7 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe
activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); // the spec does not know subscription so we can only use queue for both

Logger.ProcessingMessage(messageId: messageId, entityPath: entityPath);
using var scope = CreateScope();
using var scope = CreateServiceScope(); // shared
var contentType = new ContentType(message.ContentType);
var context = await DeserializeAsync(scope: scope,
body: message.Body,
Expand Down
10 changes: 3 additions & 7 deletions src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
Logger.SchedulingShortLived();
}

using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -168,13 +166,11 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
Logger.SchedulingShortLived();
}

using var scope = CreateScope();
var messages = new List<InMemoryMessage>();

foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -340,7 +336,7 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe
activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); // the spec does not know subscription so we can only use queue for both

Logger.ProcessingMessage(messageId: messageId, entityPath: entityPath);
using var scope = CreateScope();
using var scope = CreateServiceScope(); // shared
var contentType = message.ContentType is not null ? new ContentType(message.ContentType) : null;
var context = await DeserializeAsync(scope: scope,
body: message.Body,
Expand Down
10 changes: 3 additions & 7 deletions src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
Logger.ExpiryNotSupported();
}

using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -178,14 +176,12 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
Logger.ExpiryNotSupported();
}

using var scope = CreateScope();
var sequenceNumbers = new List<long>();

// work on each event
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -298,7 +294,7 @@ private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegi
topic: result.Topic,
partition: result.Partition,
offset: result.Offset);
using var scope = CreateScope();
using var scope = CreateServiceScope(); // shared
var contentType = contentType_str == null ? null : new ContentType(contentType_str);
var context = await DeserializeAsync(scope: scope,
body: new BinaryData(message.Value),
Expand Down
11 changes: 3 additions & 8 deletions src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
channel.ExchangeDeclare(exchange: name, type: "fanout");

// serialize the event
using var scope = CreateScope();
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -156,13 +154,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken)
var name = registration.EventName;
channel.ExchangeDeclare(exchange: name, type: "fanout");

using var scope = CreateScope();

var serializedEvents = new List<(EventContext<TEvent>, ContentType?, BinaryData)>();
foreach (var @event in events)
{
var body = await SerializeAsync(scope: scope,
@event: @event,
var body = await SerializeAsync(@event: @event,
registration: registration,
cancellationToken: cancellationToken).ConfigureAwait(false);
serializedEvents.Add((@event, @event.ContentType, body));
Expand Down Expand Up @@ -301,7 +296,7 @@ private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRe
activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); // only queues are possible

Logger.LogDebug("Processing '{MessageId}'", messageId);
using var scope = CreateScope();
using var scope = CreateServiceScope();
var contentType = GetContentType(args.BasicProperties);
var context = await DeserializeAsync(scope: scope,
body: new BinaryData(args.Body),
Expand Down
Loading