Skip to content

Commit

Permalink
Fixed schedulers for replay subjects
Browse files Browse the repository at this point in the history
  • Loading branch information
david-driscoll committed Apr 16, 2021
1 parent 0dae323 commit 74660a7
Show file tree
Hide file tree
Showing 24 changed files with 459 additions and 77 deletions.
3 changes: 2 additions & 1 deletion src/Client/LanguageClientRegistrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -26,7 +27,7 @@ internal class LanguageClientRegistrationManager : IRegisterCapabilityHandler, I
private readonly ILspHandlerTypeDescriptorProvider _handlerTypeDescriptorProvider;
private readonly ILogger<LanguageClientRegistrationManager> _logger;
private readonly ConcurrentDictionary<string, Registration> _registrations;
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1);
private readonly ReplaySubject<IEnumerable<Registration>> _registrationSubject = new ReplaySubject<IEnumerable<Registration>>(1, Scheduler.Immediate);

public LanguageClientRegistrationManager(
ISerializer serializer,
Expand Down
3 changes: 2 additions & 1 deletion src/Client/LanguageClientWorkspaceFoldersManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -25,7 +26,7 @@ public LanguageClientWorkspaceFoldersManager(IWorkspaceLanguageClient client, IE
{
_client = client;
_workspaceFolders = new ConcurrentDictionary<DocumentUri, WorkspaceFolder>(DocumentUri.Comparer);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1, Scheduler.Immediate);

foreach (var folder in workspaceFolders)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Dap.Client/ProgressObservable.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using OmniSharp.Extensions.DebugAdapter.Protocol;
Expand All @@ -14,7 +15,7 @@ internal class ProgressObservable : IProgressObservable, IObserver<ProgressEvent

public ProgressObservable(ProgressToken token)
{
_dataSubject = new ReplaySubject<ProgressEvent>(1);
_dataSubject = new ReplaySubject<ProgressEvent>(1, Scheduler.Immediate);
_disposable = new CompositeDisposable { Disposable.Create(_dataSubject.OnCompleted) };

ProgressToken = token;
Expand Down
4 changes: 2 additions & 2 deletions src/JsonRpc/ProcessScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ IScheduler scheduler

var observableQueue =
new BehaviorSubject<(RequestProcessType type, ReplaySubject<IObservable<Unit>> observer, Subject<Unit>? contentModifiedSource)>(
( RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue), supportContentModified ? new Subject<Unit>() : null )
( RequestProcessType.Serial, new ReplaySubject<IObservable<Unit>>(int.MaxValue, Scheduler.Immediate), supportContentModified ? new Subject<Unit>() : null )
);

cd.Add(
Expand All @@ -52,7 +52,7 @@ IScheduler scheduler

logger.LogDebug("Completing existing request process type {Type}", observableQueue.Value.type);
observableQueue.Value.observer.OnCompleted();
observableQueue.OnNext(( item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue), supportContentModified ? new Subject<Unit>() : null ));
observableQueue.OnNext(( item.type, new ReplaySubject<IObservable<Unit>>(int.MaxValue, Scheduler.Immediate), supportContentModified ? new Subject<Unit>() : null ));
}

logger.LogDebug("Queueing {Type}:{Name} request for processing", item.type, item.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Action onCompleteAction
)
{
_serializer = serializer;
_dataSubject = new ReplaySubject<IEnumerable<TItem>>(int.MaxValue);
_dataSubject = new ReplaySubject<IEnumerable<TItem>>(int.MaxValue, Scheduler.Immediate);
_disposable = new CompositeDisposable() { _dataSubject };

_task = Observable.Create<TResult>(
Expand Down
3 changes: 2 additions & 1 deletion src/Protocol/Progress/ProgressObservable.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
Expand All @@ -16,7 +17,7 @@ internal class ProgressObservable<T> : IProgressObservable<T>, IObserver<JToken>
public ProgressObservable(ProgressToken token, Func<JToken, T> factory, Action disposal)
{
_factory = factory;
_dataSubject = new ReplaySubject<JToken>(1);
_dataSubject = new ReplaySubject<JToken>(1, Scheduler.Immediate);
_disposable = new CompositeDisposable { Disposable.Create(_dataSubject.OnCompleted), Disposable.Create(disposal) };

ProgressToken = token;
Expand Down
13 changes: 7 additions & 6 deletions src/Protocol/Progress/RequestProgressObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ Action onCompleteAction
observer => new CompositeDisposable() {
_dataSubject
.ForkJoin(
requestResult.Do(
_ => {
if (_receivedPartialData) return;
_dataSubject.OnNext(reverseFactory(_));
}, OnError, OnCompleted
), factory
requestResult
.Do(
_ => _dataSubject.OnNext(reverseFactory(_)),
_dataSubject.OnError,
_dataSubject.OnCompleted
),
factory
)
.Subscribe(observer),
Disposable.Create(onCompleteAction)
Expand Down
3 changes: 2 additions & 1 deletion src/Server/LanguageServerWorkspaceFolderManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
Expand All @@ -27,7 +28,7 @@ public LanguageServerWorkspaceFolderManager(IWorkspaceLanguageServer server)
{
_server = server;
_workspaceFolders = new ConcurrentDictionary<DocumentUri, WorkspaceFolder>(DocumentUri.Comparer);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1);
_workspaceFoldersSubject = new ReplaySubject<IEnumerable<WorkspaceFolder>>(1, Scheduler.Immediate);
_workspaceFoldersChangedSubject = new Subject<WorkspaceFolderChange>();
}

Expand Down
3 changes: 3 additions & 0 deletions test/Lsp.Tests/Integration/DynamicRegistrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ await TestHelper.DelayUntil(
registrations => registrations.Any(registration => SelectorMatches(registration, x => x.HasLanguage && x.Language == "vb")),
CancellationToken
);

await Task.Delay(200);
disposable.Dispose();


Expand All @@ -127,6 +129,7 @@ await TestHelper.DelayUntil(
registrations => !registrations.Any(registration => SelectorMatches(registration, x => x.HasLanguage && x.Language == "vb")),
CancellationToken
);
await Task.Delay(200);

client.RegistrationManager.CurrentRegistrations.Should().NotContain(
x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,15 @@ public async Task Should_Support_Options_Monitor()
// IOptionsMonitor<> is registered as a singleton, so this will update
options.CurrentValue.Host.Should().Be("localhost");
options.CurrentValue.Port.Should().Be(443);
sub.Received(1).Invoke(Arg.Any<BinderSourceUrl>());
sub.Received(Quantity.AtLeastOne()).Invoke(Arg.Any<BinderSourceUrl>());

configuration.Update("mysection", new Dictionary<string, string> { ["host"] = "127.0.0.1", ["port"] = "80" });
await options.WaitForChange(CancellationToken);
await SettleNext();

options.CurrentValue.Host.Should().Be("127.0.0.1");
options.CurrentValue.Port.Should().Be(80);
sub.Received(2).Invoke(Arg.Any<BinderSourceUrl>());
sub.Received(Quantity.Within(2, int.MaxValue)).Invoke(Arg.Any<BinderSourceUrl>());
}

class BinderSourceUrl
Expand Down
6 changes: 3 additions & 3 deletions test/Lsp.Tests/Integration/PartialItemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public Delegates(ITestOutputHelper testOutputHelper, LanguageProtocolFixture<Def
{
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Behave_Like_A_Task()
{
var result = await Client.TextDocument.RequestSemanticTokens(
Expand All @@ -34,7 +34,7 @@ public async Task Should_Behave_Like_A_Task()
result!.Data.Should().HaveCount(3);
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Behave_Like_An_Observable()
{
var items = await Client.TextDocument
Expand All @@ -53,7 +53,7 @@ public async Task Should_Behave_Like_An_Observable()
items.Select(z => z.Data.Length).Should().ContainInOrder(1, 2, 3);
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Behave_Like_An_Observable_Without_Progress_Support()
{
var response = await Client.SendRequest(new SemanticTokensParams { TextDocument = new TextDocumentIdentifier(@"c:\test.cs") }, CancellationToken);
Expand Down
8 changes: 4 additions & 4 deletions test/Lsp.Tests/Integration/PartialItemsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public Delegates(ITestOutputHelper testOutputHelper, LanguageProtocolFixture<Def
{
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Behave_Like_A_Task()
{
var result = await Client.TextDocument.RequestCodeLens(
Expand All @@ -42,7 +42,7 @@ public async Task Should_Behave_Like_A_Task()
result.Select(z => z.Command!.Name).Should().ContainInOrder("CodeLens 1", "CodeLens 2", "CodeLens 3");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Behave_Like_An_Observable()
{
var items = await Client.TextDocument
Expand All @@ -63,7 +63,7 @@ public async Task Should_Behave_Like_An_Observable()
items.Select(z => z.Command!.Name).Should().ContainInOrder("CodeLens 1", "CodeLens 2", "CodeLens 3");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Behave_Like_An_Observable_Without_Progress_Support()
{
var response = await Client.SendRequest(
Expand Down Expand Up @@ -120,7 +120,7 @@ public Handlers(ITestOutputHelper testOutputHelper, LanguageProtocolFixture<Defa
{
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Behave_Like_An_Observable_With_WorkDone()
{
var items = new List<CodeLens>();
Expand Down
26 changes: 13 additions & 13 deletions test/Lsp.Tests/Integration/TypedCodeActionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public TypedCodeActionTests(ITestOutputHelper outputHelper) : base(new JsonRpcTe
{
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Aggregate_With_All_Related_Handlers()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -157,7 +157,7 @@ public async Task Should_Aggregate_With_All_Related_Handlers()
actions.Length.Should().Be(3);
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_With_Data_Capability()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -203,7 +203,7 @@ public async Task Should_Resolve_With_Data_Capability()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_With_Partial_Data_Capability()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -248,7 +248,7 @@ public async Task Should_Resolve_With_Partial_Data_Capability()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_With_Data_CancellationToken()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -294,7 +294,7 @@ public async Task Should_Resolve_With_Data_CancellationToken()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_With_Partial_Data_CancellationToken()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -339,7 +339,7 @@ public async Task Should_Resolve_With_Partial_Data_CancellationToken()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_With_Data()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -385,7 +385,7 @@ public async Task Should_Resolve_With_Data()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_With_Partial_Data()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -431,7 +431,7 @@ public async Task Should_Resolve_With_Partial_Data()
}


[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_Capability()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -467,7 +467,7 @@ public async Task Should_Resolve_Capability()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_Partial_Capability()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -502,7 +502,7 @@ public async Task Should_Resolve_Partial_Capability()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_CancellationToken()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -538,7 +538,7 @@ public async Task Should_Resolve_CancellationToken()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_Partial_CancellationToken()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -573,7 +573,7 @@ public async Task Should_Resolve_Partial_CancellationToken()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve()
{
var (client, _) = await Initialize(
Expand Down Expand Up @@ -609,7 +609,7 @@ public async Task Should_Resolve()
item.CodeAction!.Command!.Name.Should().Be("resolved");
}

[Fact]
[Fact]//[RetryFact]
public async Task Should_Resolve_Partial()
{
var (client, _) = await Initialize(
Expand Down
Loading

0 comments on commit 74660a7

Please sign in to comment.