Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Intercepting the chaining of operators #540

Closed
akarnokd opened this issue May 27, 2018 · 6 comments
Closed

4.x: Intercepting the chaining of operators #540

akarnokd opened this issue May 27, 2018 · 6 comments

Comments

@akarnokd
Copy link
Collaborator

akarnokd commented May 27, 2018

There is a plan to inline the IQueryLanguage interface and QueryLanguage class into the Observable static class (See #526 & #539) to save on an indirection and interface lookup. The IQueryLanguage has been always internal so in theory, nothing should depend on it or the original indirection.

In practice though, looks like there exist tools such as RxSpy that seem to latch onto this feature so that the original s_impl in every operator could be overridden via a proxy implementation of the IQueryLanguage interface. Obviously, if s_impl is no longer used, this and other similar tools will break.

I'm convinced that the performance benefit of the inlining is quite worth it, but supporting external diagnostic options is also considerable feature requirement.

I can think of two approaches that could re-enable the overriding of the standard implementations, but there are caveats.

1. Keep the IQueryLanguage interface

The IQueryLanguage interface is retained an implementation can be inserted globally that takes precedence. Example (via IQueryLanguageEx for brevity):

namespace System.Reactive.Linq
{
    /// <summary>
    /// Provides a set of static methods for writing in-memory queries over observable sequences.
    /// </summary>
    public static class ObservableEx
    {
        private static IQueryLanguageEx _override;

        /// <summary>
        /// Override the default implementation of operators.
        /// </summary>
        public static IQueryLanguageEx Override
        {
            get { return Volatile.Read(ref _override); }
            set { Interlocked.Exchange(ref _override, value);  }
        }

        [Experimental]
        public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
        {
            if (iteratorMethod == null)
                throw new ArgumentNullException(nameof(iteratorMethod));
// vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
            var ovr = Override;
            if (ovr != null)
            {
                return ovr.Create(iteratorMethod);
            }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            return new AnonymousObservable<TResult>(observer =>
                iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
        }
    }
}

Concerns:

  • Will the property access be inlined, minimizing the overhead of this approach to a simple volatile read and an uncommon branch?
  • This requires a full implementation of the IQueryObservable interface and there is no way to delegate back to the standard implementation publicly.
  • Security concerns about who can hook override a chain.

2. Use type-specific operator-assembly hooks

The alternative, that RxJava also uses is to define function hooks that get called before an implementation is returned, allowing insertion of custom logic and/or complete replacement:

namespace System.Reactive.Linq
{
    /// <summary>
    /// Holds type specific assembly hooks to intercept
    /// the chaining of standard operators.
    /// </summary>
    /// <typeparam name="T">The element type of the observable returned.</typeparam>
    public static class QueryLanguageHooks<T>
    {
        private static Func<IObservable<T>, IObservable<T>> _onAssembly;

        public static Func<IObservable<T>, IObservable<T>> OnAssembly
        {
            get { return Volatile.Read(ref _onAssembly); }
            set { Interlocked.Exchange(ref _onAssembly, value); }
        }
        
        public static IObservable<T> Hook(IObservable<T> source)
        {
            var f = OnAssembly;
            if (f == null)
            {
                return source;
            }
            return f(source);
        }
    }

    /// <summary>
    /// Provides a set of static methods for writing in-memory queries over observable sequences.
    /// </summary>
    public static class ObservableEx
    {
        [Experimental]
        public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
        {
            if (iteratorMethod == null)
                throw new ArgumentNullException(nameof(iteratorMethod));
// vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
            return QueryLanguageHooks<TResult>.Hook(
                new AnonymousObservable<TResult>(observer =>
                iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted))
            );
        }
    }
}

Concerns:

  • As I understand the struct/class generics, this approach requires the programmer do explicitly define typed callbacks for all possible types used in the host program: QueryLanguageHooks<int>.Hook = intobs -> intobs;
  • Using any standard method call on Observable inside the Func may lead to an infinite recursion.
  • Security concerns about who can hook into a chain.

(None of these is an issue with RxJava because Java currently only supports class-based generics so one can define an Observable<Object> hook only, the default implementations are semi-publicly available and finally there is a Java security manager infrastructure that can limit accesses so we don't have to deal with that at all.)

3. Keep both IQueryLanguage & QueryLanguage but do the override approach

Establish a static path via QueryLanguage.SomeMethod() instead of s_impl.Create:

namespace System.Reactive.Linq
{
    /// <summary>
    /// Provides a set of static methods for writing in-memory queries over observable sequences.
    /// </summary>
    public static class ObservableEx
    {
        private static IQueryLanguageEx _override;

        /// <summary>
        /// Override the default implementation of operators.
        /// </summary>
        public static IQueryLanguageEx Override
        {
            get { return Volatile.Read(ref _override); }
            set { Interlocked.Exchange(ref _override, value);  }
        }

        [Experimental]
        public static IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
        {
            if (iteratorMethod == null)
                throw new ArgumentNullException(nameof(iteratorMethod));
// vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
            var ovr = Override;
            if (ovr != null)
            {
                return ovr.Create(iteratorMethod);
            }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            return QueryLanguage.Create(iteratorMethod);
        }
    }
}
@jcmm33
Copy link

jcmm33 commented May 29, 2018

My only real points would be:

  1. You mention that there are performance benefits, if so what sort of measured gain can be seen?
  2. Would workarounds not involving a replacement IQueryLanguage work on all platforms e.g. iOS & Android as well as the more commonplace ones?
  3. From my own perspective, I found being able to 'proxy' the IQueryLanguage very helpful indeed when working Android for diagnostics and debugging. It would be a shame to loose the capability and moreover it would be desirable to retain an official approach as to how to achieve this without the kludges.
  4. Perhaps with more official support the approach of getting to the internals and seeing what is being passed through could become one of the tools that people reach for when adding diagnostics to their applications during development.

@bartdesmet
Copy link
Collaborator

Background

The design of IQueryLanguage came from an early effort to support a debugger for Rx. The source history may reveal an [InternalsVisibleTo("System.Reactive.Debugger")] somewhere. Unfortunately, this effort was put on halt after shipping Rx v2.0 when the project was transferred to OSS and the prototype of the debugger wasn't moved over due to a plethora of external dependencies, but I can recover the design from memory more or less.

Essentially, it worked by decorating each operator in each reactive query expression using an Observable.Create wrapper providing hooks for Subscribe, Dispose, and each On* method on the observer, akin to this:

static IObservable<T> Intercept<T>(this IObservable<T> source, DebuggerHooks hooks)
{
  return Observable.Create<T>(observer =>
  {
    var d = hooks.Subscribe(source, observer); // internals calls observer.Intercept(hooks) to decorate the observer
    return hooks.Dispose(d);
  });
}

where hooks.* methods wrap the action provided in a try...catch...finally... statement to invoke BeforeX, AfterX, and FailX calls on the hook. In here X is any of the following: Subscribe, Dispose, OnNext, OnError, OnCompleted (so 5 lifecycle events, each having 3 phases).

The DebuggerHooks object has events for each of these, which can get set by a debugger, e.g.

  • to "break on exception", a callback is attached for all FailX events,
  • to set a "breakpoint" on a . between two operators (really a pipe through which events flow), a callback is attached to all BeforeOn* events,
  • etc.

The operator topology was inferred for visualization purposes by means of handling the Subscribe and Dispose events on the hooks, correlating the observers to operators. E.g. when xs.Foo().Bar() is being subscribed to, the hooks around Bar see the observer that's being passed to Foo, thus establishing the relationship between both operators. That provided a similar visualization as the task debugger in Visual Studio (see Tasks Window).

Usage

I know of a few places where this functionality has been used. RxSpy was mentioned before on this thread and seems to be the right approach for a reactive debugger going forward. Internally over here, I can think of two efforts that have directly depended on this functionality for diagnostics and throttling (detecting hot nodes in query operator graphs and providing alerts or taking actions), but I'm not sure about these state of those and will try to find out.

Also, in earlier days, IQbservable<T> supported binding to IQueryLanguage instances when compiling queries, bypassing the whole static extension methods dance, while enabling to bind to different in-memory query processor implementations that share the same Rx language surface.

So, I think it'd be unfortunate to get rid of this functionality altogether, and IQueryLanguage has provided for the most fine-grained means to intercept operators, which is a useful property to retain (rather than having a single "intercept anything" mechanism).

Advice

I think it'd be worth for motivated community members to revamp this effort :-). I'm not sure if @niik would still be interested in his work, possibly bringing this project to the .NET Foundation as a tool that goes right with Rx. In fact, in a bigger scope for tooling-related projects, it wouldn't be a bad thing for Rx to include Roslyn analyzers to provide refactorings (e.g. on an event, provide a Create FromEvent wrapper, etc.) and analyzers (e.g. detect common mistakes).

Though I'm biased towards the original design, I do believe that IQueryLanguage as it exists today has the right granularity and the right place in the layer map, right in between the language projection (i.e. extension methods) and the query operator implementations. The removal of the System.Reactive.PlatformServices stuff has led to all functionality being flattened in one assembly, making it hard to appreciate this original placement as a means to separate one particular System.Reactive.Linq implementation from alternatives, with an interceptor in the middle (also supporting dynamic assembly loading of various "enlightenment modules", initially to deal with the differences between target .NET versions while keeping all System.Reactive.* assemblies other than System.Reactive.PlatformServices platform-agnostic and portable).

I'm also not sure I buy in to the performance argument, given that these are high-level control plane operations that only run during composition of query operators. That is, they don't run for Subscribe/Dispose lifecycle operations, nor for event flow in the data plane. The only place where they end up in the data plane is when queries are being composed within a selector such as SelectMany and it's unclear to me this would really move the needle on performance (compared to all the heavy lifting inside operators such as IDisposable allocations), so I wouldn't kill functionality without hard performance numbers. (In fact, work in CoreCLR's JIT to provide more effective call devirtualization may even be effective here, but I'd have to spend some time to do a thorough analysis.) In Rx, we've always tried to reduce cost in the data flow plane by pushing cost to the control plane when possible.

@niik
Copy link

niik commented Apr 11, 2019

I'm not sure if @niik would still be interested in his work, possibly bringing this project to the .NET Foundation as a tool that goes right with Rx.

I'd be delighted to see RxSpy evolve alongside with the ecosystem. I firmly believe that visualisation, debugging, and intelligent analysis of reactive flows is indispensable when working with large scale reactive applications and I think it would be a bummer if the ability to innovate in that space would diminish.

I wrote RxSpy to explore what reactive analysis tooling could look like. In hindsight it was clear that live analysis, while being an excellent tool for teaching Rx, and one I used several times to explain the concepts, was likely not going to be as powerful as sampling, replaying and analysing flows after the fact. I started toying with that in niik/RxSpy#25 but never completed it.

That said I'm afraid my work has lead me down other paths and I haven't been able to keep up with Rx for quite some time now. While I'd be glad to help I'm afraid my bandwidth is quite constrained these days.

@VistianOpenSource
Copy link

I took the work @niik had done and adjusted it to work on Android, not for visualization per se, but to get better diagnostics when things didn't work. See https://github.com/VistianOpenSource/Vistian.Reactive.Proxy .

@bartdesmet
Copy link
Collaborator

Cool, thanks for chiming in @niik and @VistianOpenSource. I think it's pretty clear that this mechanism is something we should keep and build on going forward. The main difference for implementation techniques on newer .NET platforms may be the lack of transparent proxy stuff, but it'd be fairly trivial to auto-generate an interceptor IQueryLanguage implementation at compile-time by reflecting on the interface and generating code using some T4 file.

We'll keep this issue as a coat hanger for future discussions.

@akarnokd
Copy link
Collaborator Author

Looks like the current setup is here to stay ever since. Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants