Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions Rx.NET/Source/src/System.Reactive/AnonymousSafeObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ namespace System.Reactive
/// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
/// helps debugging and some performance.
/// </summary>
internal sealed class AnonymousSafeObserver<T> : ISafeObserver<T>
internal sealed class AnonymousSafeObserver<T> : SafeObserver<T>
{
private readonly Action<T> _onNext;
private readonly Action<Exception> _onError;
private readonly Action _onCompleted;
private IDisposable _disposable;

private int isStopped;

Expand All @@ -35,7 +34,7 @@ public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action
_onCompleted = onCompleted;
}

public void OnNext(T value)
public override void OnNext(T value)
{
if (isStopped == 0)
{
Expand All @@ -53,7 +52,7 @@ public void OnNext(T value)
}
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
{
Expand All @@ -64,7 +63,7 @@ public void OnError(Exception error)
}
}

public void OnCompleted()
public override void OnCompleted()
{
if (Interlocked.Exchange(ref isStopped, 1) == 0)
{
Expand All @@ -74,15 +73,5 @@ public void OnCompleted()
}
}
}

public void SetResource(IDisposable resource)
{
Disposable.SetSingle(ref _disposable, resource);
}

public void Dispose()
{
Disposable.TryDispose(ref _disposable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
// See the LICENSE file in the project root for more information.

using System.Reactive.Disposables;
using System.Threading;

namespace System.Reactive
{
internal sealed class AutoDetachObserver<T> : ObserverBase<T>
internal sealed class AutoDetachObserver<T> : ObserverBase<T>, ISafeObserver<T>
{
private readonly IObserver<T> _observer;

Expand All @@ -18,9 +17,9 @@ public AutoDetachObserver(IObserver<T> observer)
_observer = observer;
}

public IDisposable Disposable
public void SetResource(IDisposable resource)
{
set => Disposables.Disposable.SetSingle(ref _disposable, value);
Disposable.SetSingle(ref _disposable, resource);
}

protected override void OnNextCore(T value)
Expand Down
60 changes: 0 additions & 60 deletions Rx.NET/Source/src/System.Reactive/Internal/ObserverWithToken.cs

This file was deleted.

4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/Internal/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguar
//
if (enableSafeguard)
{
var safeObserver = SafeObserver<TSource>.Create(observer);
var safeObserver = SafeObserver<TSource>.Wrap(observer);
safeObserver.SetResource(subscription);
observer = safeObserver;
}
Expand Down Expand Up @@ -100,7 +100,7 @@ public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguar
//
if (enableSafeguard)
{
observer = safeObserver = SafeObserver<TTarget>.Create(observer);
observer = safeObserver = SafeObserver<TTarget>.Wrap(observer);
}

var sink = CreateSink(observer);
Expand Down
85 changes: 50 additions & 35 deletions Rx.NET/Source/src/System.Reactive/Internal/SafeObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,70 +11,85 @@ namespace System.Reactive
// its implementation aspects.
//

internal sealed class SafeObserver<TSource> : ISafeObserver<TSource>
internal abstract class SafeObserver<TSource> : ISafeObserver<TSource>
{
private readonly IObserver<TSource> _observer;

private IDisposable _disposable;

public static ISafeObserver<TSource> Create(IObserver<TSource> observer)
private sealed class WrappingSafeObserver : SafeObserver<TSource>
{
if (observer is AnonymousObserver<TSource> a)
private readonly IObserver<TSource> _observer;

public WrappingSafeObserver(IObserver<TSource> observer)
{
return a.MakeSafe();
_observer = observer;
}
else

public override void OnNext(TSource value)
{
return new SafeObserver<TSource>(observer);
var __noError = false;
try
{
_observer.OnNext(value);
__noError = true;
}
finally
{
if (!__noError)
{
Dispose();
}
}
}
}

private SafeObserver(IObserver<TSource> observer)
{
_observer = observer;
}

public void OnNext(TSource value)
{
var __noError = false;
try
public override void OnError(Exception error)
{
_observer.OnNext(value);
__noError = true;
using (this)
{
_observer.OnError(error);
}
}
finally

public override void OnCompleted()
{
if (!__noError)
using (this)
{
Dispose();
_observer.OnCompleted();
}
}
}

public void OnError(Exception error)
public static ISafeObserver<TSource> Wrap(IObserver<TSource> observer)
{
using (this)
if (observer is AnonymousObserver<TSource> a)
{
_observer.OnError(error);
return a.MakeSafe();
}
}

public void OnCompleted()
{
using (this)
else
{
_observer.OnCompleted();
return new WrappingSafeObserver(observer);
}
}

private IDisposable _disposable;

public abstract void OnNext(TSource value);

public abstract void OnError(Exception error);

public abstract void OnCompleted();

public void SetResource(IDisposable resource)
{
Disposable.SetSingle(ref _disposable, resource);
}

public void Dispose()
{
Disposable.TryDispose(ref _disposable);
Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
if (disposing)
Disposable.TryDispose(ref _disposable);
}
}
}
2 changes: 1 addition & 1 deletion Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private static void Subscribe_<T>(this IObservable<T> source, IObserver<T> obser
{
if (!token.IsCancellationRequested)
{
var consumer = new ObserverWithToken<T>(observer);
var consumer = SafeObserver<T>.Wrap(observer);

//
// [OK] Use of unsafe Subscribe: exception during Subscribe doesn't orphan CancellationTokenRegistration.
Expand Down
4 changes: 2 additions & 2 deletions Rx.NET/Source/src/System.Reactive/ObservableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public IDisposable Subscribe(IObserver<T> observer)
{
try
{
autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
}
catch (Exception exception)
{
Expand All @@ -79,7 +79,7 @@ private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver<T> autoD
{
try
{
autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver);
autoDetachObserver.SetResource(SubscribeCore(autoDetachObserver));
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public SerializableObservable(RemotableObservable<T> remotableObservable)

public IDisposable Subscribe(IObserver<T> observer)
{
var consumer = new ObserverWithToken<T>(observer);
var consumer = SafeObserver<T>.Wrap(observer);

//
// [OK] Use of unsafe Subscribe: non-pretentious transparent wrapping through remoting; exception coming from the remote object is not re-routed.
Expand Down