Skip to content

Commit

Permalink
Fixed schedulers for replay subjects
Browse files Browse the repository at this point in the history
  • Loading branch information
david-driscoll committed Apr 16, 2021
1 parent 0dae323 commit 3ddfa0f
Show file tree
Hide file tree
Showing 20 changed files with 401 additions and 20 deletions.
3 changes: 2 additions & 1 deletion src/Client/LanguageClientRegistrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -26,7 +27,7 @@ internal class LanguageClientRegistrationManager : IRegisterCapabilityHandler, I
private readonly ILspHandlerTypeDescriptorProvider _handlerTypeDescriptorProvider;
private readonly ILogger<LanguageClientRegistrationManager> _logger;
private readonly ConcurrentDictionary<string, Registration> _registrations;
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1);
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1, Scheduler.Immediate);

public LanguageClientRegistrationManager(
ISerializer serializer,
Expand Down
3 changes: 2 additions & 1 deletion src/Client/LanguageClientWorkspaceFoldersManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -25,7 +26,7 @@ public LanguageClientWorkspaceFoldersManager(IWorkspaceLanguageClient client, IE
{
_client = client;
_workspaceFolders = new ConcurrentDictionary<DocumentUri, WorkspaceFolder>(DocumentUri.Comparer);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1, Scheduler.Immediate);

foreach (var folder in workspaceFolders)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Dap.Client/ProgressObservable.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using OmniSharp.Extensions.DebugAdapter.Protocol;
Expand All @@ -14,7 +15,7 @@ internal class ProgressObservable : IProgressObservable, IObserver<ProgressEvent

public ProgressObservable(ProgressToken token)
{
_dataSubject = new ReplaySubject<ProgressEvent>(1);
_dataSubject = new ReplaySubject<ProgressEvent>(1, Scheduler.Immediate);
_disposable = new CompositeDisposable { Disposable.Create(_dataSubject.OnCompleted) };

ProgressToken = token;
Expand Down
2 changes: 1 addition & 1 deletion src/JsonRpc/OutputHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ ILogger<OutputHandler> logger
_outputFilters = outputFilters.ToArray();
_logger = logger;
_queue = new Subject<object>();
_delayedQueue = new ReplaySubject<object>();
_delayedQueue = new ReplaySubject<object>(Scheduler.Immediate);
_outputIsFinished = new TaskCompletionSource<object?>();

_disposable = new CompositeDisposable {
Expand Down
4 changes: 2 additions & 2 deletions src/JsonRpc/ProcessScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ IScheduler scheduler

var observableQueue =
new BehaviorSubject<(RequestProcessType type, ReplaySubject<IObservable<Unit>> observer, Subject<Unit>? contentModifiedSource)>(
( RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue), supportContentModified ? new Subject<Unit>() : null )
( RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue, Scheduler.Immediate), supportContentModified ? new Subject<Unit>() : null )
);

cd.Add(
Expand All @@ -52,7 +52,7 @@ IScheduler scheduler

logger.LogDebug("Completing existing request process type {Type}", observableQueue.Value.type);
observableQueue.Value.observer.OnCompleted();
observableQueue.OnNext(( item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue), supportContentModified ? new Subject<Unit>() : null ));
observableQueue.OnNext(( item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue, Scheduler.Immediate), supportContentModified ? new Subject<Unit>() : null ));
}

logger.LogDebug("Queueing {Type}:{Name} request for processing", item.type, item.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Action onCompleteAction
)
{
_serializer = serializer;
_dataSubject = new ReplaySubject<IEnumerable<TItem>>(int.MaxValue);
_dataSubject = new ReplaySubject<IEnumerable<TItem>>(int.MaxValue, Scheduler.Immediate);
_disposable = new CompositeDisposable() { _dataSubject };

_task = Observable.Create<TResult>(
Expand Down
3 changes: 2 additions & 1 deletion src/Protocol/Progress/ProgressObservable.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
Expand All @@ -16,7 +17,7 @@ internal class ProgressObservable<T> : IProgressObservable<T>, IObserver<JToken>
public ProgressObservable(ProgressToken token, Func<JToken, T> factory, Action disposal)
{
_factory = factory;
_dataSubject = new ReplaySubject<JToken>(1);
_dataSubject = new ReplaySubject<JToken>(1, Scheduler.Immediate);
_disposable = new CompositeDisposable { Disposable.Create(_dataSubject.OnCompleted), Disposable.Create(disposal) };

ProgressToken = token;
Expand Down
3 changes: 2 additions & 1 deletion src/Server/LanguageServerWorkspaceFolderManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -27,7 +28,7 @@ public LanguageServerWorkspaceFolderManager(IWorkspaceLanguageServer server)
{
_server = server;
_workspaceFolders = new ConcurrentDictionary<DocumentUri, WorkspaceFolder>(DocumentUri.Comparer);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1, Scheduler.Immediate);
_workspaceFoldersChangedSubject = new Subject<WorkspaceFolderChange>();
}

Expand Down
3 changes: 3 additions & 0 deletions test/Lsp.Tests/Integration/DynamicRegistrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ await TestHelper.DelayUntil(
registrations => registrations.Any(registration => SelectorMatches(registration, x => x.HasLanguage && x.Language == "vb")),
CancellationToken
);

await Task.Delay(200);
disposable.Dispose();


Expand All @@ -127,6 +129,7 @@ await TestHelper.DelayUntil(
registrations => !registrations.Any(registration => SelectorMatches(registration, x => x.HasLanguage && x.Language == "vb")),
CancellationToken
);
await Task.Delay(200);

client.RegistrationManager.CurrentRegistrations.Should().NotContain(
x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,15 @@ public async Task Should_Support_Options_Monitor()
// IOptionsMonitor<> is registered as a singleton, so this will update
options.CurrentValue.Host.Should().Be("localhost");
options.CurrentValue.Port.Should().Be(443);
sub.Received(1).Invoke(Arg.Any<BinderSourceUrl>());
sub.Received(Quantity.AtLeastOne()).Invoke(Arg.Any<BinderSourceUrl>());

configuration.Update("mysection", new Dictionary<string, string> { ["host"] = "127.0.0.1", ["port"] = "80" });
await options.WaitForChange(CancellationToken);
await SettleNext();

options.CurrentValue.Host.Should().Be("127.0.0.1");
options.CurrentValue.Port.Should().Be(80);
sub.Received(2).Invoke(Arg.Any<BinderSourceUrl>());
sub.Received(Quantity.Within(2, int.MaxValue)).Invoke(Arg.Any<BinderSourceUrl>());
}

class BinderSourceUrl
Expand Down
6 changes: 3 additions & 3 deletions test/Lsp.Tests/Integration/PartialItemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public Delegates(ITestOutputHelper testOutputHelper, LanguageProtocolFixture<Def
{
}

[Fact]
[RetryFact]
public async Task Should_Behave_Like_A_Task()
{
var result = await Client.TextDocument.RequestSemanticTokens(
Expand All @@ -34,7 +34,7 @@ public async Task Should_Behave_Like_A_Task()
result!.Data.Should().HaveCount(3);
}

[Fact]
[RetryFact]
public async Task Should_Behave_Like_An_Observable()
{
var items = await Client.TextDocument
Expand All @@ -53,7 +53,7 @@ public async Task Should_Behave_Like_An_Observable()
items.Select(z => z.Data.Length).Should().ContainInOrder(1, 2, 3);
}

[Fact]
[RetryFact]
public async Task Should_Behave_Like_An_Observable_Without_Progress_Support()
{
var response = await Client.SendRequest(new SemanticTokensParams { TextDocument = new TextDocumentIdentifier(@"c:\test.cs") }, CancellationToken);
Expand Down
8 changes: 4 additions & 4 deletions test/Lsp.Tests/Integration/PartialItemsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Delegates(ITestOutputHelper testOutputHelper, LanguageProtocolFixture<Def
{
}

[Fact]
[RetryFact]
public async Task Should_Behave_Like_A_Task()
{
var result = await Client.TextDocument.RequestCodeLens(
Expand All @@ -42,7 +42,7 @@ public async Task Should_Behave_Like_A_Task()
result.Select(z => z.Command!.Name).Should().ContainInOrder("CodeLens 1", "CodeLens 2", "CodeLens 3");
}

[Fact]
[RetryFact]
public async Task Should_Behave_Like_An_Observable()
{
var items = await Client.TextDocument
Expand All @@ -63,7 +63,7 @@ public async Task Should_Behave_Like_An_Observable()
items.Select(z => z.Command!.Name).Should().ContainInOrder("CodeLens 1", "CodeLens 2", "CodeLens 3");
}

[Fact]
[RetryFact]
public async Task Should_Behave_Like_An_Observable_Without_Progress_Support()
{
var response = await Client.SendRequest(
Expand Down Expand Up @@ -120,7 +120,7 @@ public Handlers(ITestOutputHelper testOutputHelper, LanguageProtocolFixture<Defa
{
}

[Fact]
[RetryFact]
public async Task Should_Behave_Like_An_Observable_With_WorkDone()
{
var items = new List<CodeLens>();
Expand Down
3 changes: 1 addition & 2 deletions test/TestingUtils/AutoNSubstitute/TestExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Reactive.Concurrency;
using System.Threading;
using System.Threading;
using OmniSharp.Extensions.JsonRpc.Testing;
using Serilog.Events;
using Xunit.Abstractions;
Expand Down
53 changes: 53 additions & 0 deletions test/TestingUtils/RetryFactAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// See https://github.com/JoshKeegan/xRetry

using System;
using System.Linq;
using Xunit;
using Xunit.Sdk;

namespace TestingUtils
{
/// <summary>
/// Attribute that is applied to a method to indicate that it is a fact that should be run
/// by the test runner up to MaxRetries times, until it succeeds.
/// </summary>
[XunitTestCaseDiscoverer("TestingUtils.RetryFactDiscoverer", "TestingUtils")]
[AttributeUsage(AttributeTargets.Method)]
public class RetryFactAttribute : FactAttribute
{
public readonly int MaxRetries;
public readonly int DelayBetweenRetriesMs;
public readonly SkipOnPlatform[] PlatformsToSkip;
private string? _skip;

/// <summary>
/// Ctor
/// </summary>
/// <param name="maxRetries">The number of times to run a test for until it succeeds</param>
/// <param name="delayBetweenRetriesMs">The amount of time (in ms) to wait between each test run attempt</param>
/// <param name="skipOn">platforms to skip testing on</param>
public RetryFactAttribute(int maxRetries = 5, int delayBetweenRetriesMs = 0, params SkipOnPlatform[] skipOn)
{
if (maxRetries < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxRetries) + " must be >= 1");
}
if (delayBetweenRetriesMs < 0)
{
throw new ArgumentOutOfRangeException(nameof(delayBetweenRetriesMs) + " must be >= 0");
}

MaxRetries = !UnitTestDetector.IsCI() ? 1 : maxRetries;
DelayBetweenRetriesMs = delayBetweenRetriesMs;
PlatformsToSkip = skipOn;
}

public override string? Skip
{
get => UnitTestDetector.IsCI() && PlatformsToSkip.Any(UnitTestDetector.PlatformToSkipPredicate)
? "Skipped on platform" + ( string.IsNullOrWhiteSpace(_skip) ? "" : " because " + _skip )
: null;
set => _skip = value;
}
}
}
46 changes: 46 additions & 0 deletions test/TestingUtils/RetryFactDiscoverer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Collections.Generic;
using System.Linq;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace TestingUtils
{
public class RetryFactDiscoverer : IXunitTestCaseDiscoverer
{
private readonly IMessageSink _messageSink;

public RetryFactDiscoverer(IMessageSink messageSink)
{
_messageSink = messageSink;
}

public IEnumerable<IXunitTestCase> Discover(ITestFrameworkDiscoveryOptions discoveryOptions, ITestMethod testMethod,
IAttributeInfo factAttribute)
{
IXunitTestCase testCase;

if (testMethod.Method.GetParameters().Any())
{
testCase = new ExecutionErrorTestCase(_messageSink, discoveryOptions.MethodDisplayOrDefault(),
discoveryOptions.MethodDisplayOptionsOrDefault(), testMethod,
"[RetryFact] methods are not allowed to have parameters. Did you mean to use [RetryTheory]?");
}
else if (testMethod.Method.IsGenericMethodDefinition)
{
testCase = new ExecutionErrorTestCase(_messageSink, discoveryOptions.MethodDisplayOrDefault(),
discoveryOptions.MethodDisplayOptionsOrDefault(), testMethod,
"[RetryFact] methods are not allowed to be generic.");
}
else
{
var maxRetries = factAttribute.GetNamedArgument<int>(nameof(RetryFactAttribute.MaxRetries));
var delayBetweenRetriesMs =
factAttribute.GetNamedArgument<int>(nameof(RetryFactAttribute.DelayBetweenRetriesMs));
testCase = new RetryTestCase(_messageSink, discoveryOptions.MethodDisplayOrDefault(),
discoveryOptions.MethodDisplayOptionsOrDefault(), testMethod, maxRetries, delayBetweenRetriesMs);
}

return new[] { testCase };
}
}
}
60 changes: 60 additions & 0 deletions test/TestingUtils/RetryTestCase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.ComponentModel;
using System.Threading;
using System.Threading.Tasks;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace TestingUtils
{
[Serializable]
public class RetryTestCase : XunitTestCase, IRetryableTestCase
{
public int MaxRetries { get; private set; }
public int DelayBetweenRetriesMs { get; private set; }

[EditorBrowsable(EditorBrowsableState.Never)]
[Obsolete(
"Called by the de-serializer; should only be called by deriving classes for de-serialization purposes", true)]
public RetryTestCase() { }

public RetryTestCase(
IMessageSink diagnosticMessageSink,
TestMethodDisplay defaultMethodDisplay,
TestMethodDisplayOptions defaultMethodDisplayOptions,
ITestMethod testMethod,
int maxRetries,
int delayBetweenRetriesMs,
object[]? testMethodArguments = null)
: base(diagnosticMessageSink, defaultMethodDisplay, defaultMethodDisplayOptions, testMethod,
testMethodArguments)
{
MaxRetries = maxRetries;
DelayBetweenRetriesMs = delayBetweenRetriesMs;
}

public override Task<RunSummary> RunAsync(IMessageSink diagnosticMessageSink, IMessageBus messageBus,
object[] constructorArguments, ExceptionAggregator aggregator,
CancellationTokenSource cancellationTokenSource) =>
RetryTestCaseRunner.RunAsync(this, diagnosticMessageSink, messageBus, cancellationTokenSource,
blockingMessageBus => new XunitTestCaseRunner(this, DisplayName, SkipReason, constructorArguments,
TestMethodArguments, blockingMessageBus, aggregator, cancellationTokenSource)
.RunAsync());

public override void Serialize(IXunitSerializationInfo data)
{
base.Serialize(data);

data.AddValue("MaxRetries", MaxRetries);
data.AddValue("DelayBetweenRetriesMs", DelayBetweenRetriesMs);
}

public override void Deserialize(IXunitSerializationInfo data)
{
base.Deserialize(data);

MaxRetries = data.GetValue<int>("MaxRetries");
DelayBetweenRetriesMs = data.GetValue<int>("DelayBetweenRetriesMs");
}
}
}
Loading

0 comments on commit 3ddfa0f

Please sign in to comment.