Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 76 additions & 39 deletions src/core/Akka.Tests/Pattern/CircuitBreakerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.TestKit;
using Akka.Tests.Util;
using Xunit;

namespace Akka.Tests.Pattern
Expand Down Expand Up @@ -226,27 +227,29 @@ public void Must_still_be_in_open_state_after_calling_fail_method()
public class AnAsynchronousCircuitBreakerThatIsClosed : CircuitBreakerSpecBase
{
[Fact(DisplayName = "An asynchronous circuit breaker that is closed should allow call through")]
public void Should_Allow_Call_Through( )
public async Task Should_Allow_Call_Through( )
{
var breaker = LongCallTimeoutCb( );
var result = breaker.Instance.WithCircuitBreaker( () => Task.Run( ( ) => SayTest( ) ) );
var result = await breaker.Instance.WithCircuitBreaker( () => Task.Run( ( ) => SayTest( ) ) );

Assert.Equal( SayTest( ), result.Result );
Assert.Equal( SayTest( ), result );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call fails")]
public void Should_Increment_Failure_Count_When_Call_Fails( )
public async Task Should_Increment_Failure_Count_When_Call_Fails( )
{
var breaker = LongCallTimeoutCb( );

Assert.Equal(0, breaker.Instance.CurrentFailureCount);
Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Run( ( ) => ThrowException( ) ) ).Wait( AwaitTimeout ) ) );
Assert.True( await InterceptExceptionTypeAsync<TestException>( async () =>
await breaker.Instance.WithCircuitBreaker( async () =>
await Task.Run( ThrowException ) ).AwaitWithTimeout(AwaitTimeout) ) );
Assert.True( CheckLatch( breaker.OpenLatch ) );
Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed should reset failure count when call succeeds after failure")]
public void Should_Reset_Failure_Count_When_Call_Succeeds_After_Failure( )
public async Task Should_Reset_Failure_Count_When_Call_Succeeds_After_Failure( )
{
var breaker = MultiFailureCb( );

Expand All @@ -258,26 +261,28 @@ public void Should_Reset_Failure_Count_When_Call_Succeeds_After_Failure( )
, breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException))
, breaker.Instance.WithCircuitBreaker(() => Task.Factory.StartNew(ThrowException)));

Assert.True( InterceptExceptionType<TestException>( ( ) => whenall.Wait( AwaitTimeout ) ) );
Assert.True( await InterceptExceptionTypeAsync<TestException>( async ( ) =>
await whenall.AwaitWithTimeout(AwaitTimeout) ) );

Assert.Equal(4, breaker.Instance.CurrentFailureCount);

var result = breaker.Instance.WithCircuitBreaker(() => Task.Run( ( ) => SayTest( ) ) ).Result;
var result = await breaker.Instance.WithCircuitBreaker(async () => await Task.Run( SayTest ) );

Assert.Equal( SayTest( ), result );
Assert.Equal( 0, breaker.Instance.CurrentFailureCount );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is closed should increment failure count when call times out")]
public void Should_Increment_Failure_Count_When_Call_Times_Out( )
public async Task Should_Increment_Failure_Count_When_Call_Times_Out( )
{
var breaker = ShortCallTimeoutCb( );

breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ( ) =>
{
Thread.Sleep( 500 );
return SayTest( );
} ) );
await breaker.Instance.WithCircuitBreaker( async () =>
await Task.Run( async () =>
{
await Task.Delay(500);
return SayTest( );
} ));

Assert.True( CheckLatch( breaker.OpenLatch ) );
Assert.Equal( 1, breaker.Instance.CurrentFailureCount );
Expand All @@ -288,28 +293,33 @@ public void Should_Increment_Failure_Count_When_Call_Times_Out( )
public class AnAsynchronousCircuitBreakerThatIsHalfOpen : CircuitBreakerSpecBase
{
[Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to close on success")]
public void Should_Pass_Call_And_Transition_To_Close_On_Success( )
public async Task Should_Pass_Call_And_Transition_To_Close_On_Success( )
{
var breaker = ShortResetTimeoutCb( );
InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ) );
await InterceptExceptionTypeAsync<TestException>( async ( ) =>
await breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ) );
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );

var result = breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ( ) => SayTest( ) ) );
var result = await breaker.Instance.WithCircuitBreaker( async
() => await Task.Factory.StartNew( SayTest ) );

Assert.True( CheckLatch( breaker.ClosedLatch ) );
Assert.Equal( SayTest( ), result.Result );
Assert.Equal( SayTest( ), result );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is half open should pass call and transition to open on exception")]
public void Should_Pass_Call_And_Transition_To_Open_On_Exception( )
public async Task Should_Pass_Call_And_Transition_To_Open_On_Exception( )
{
var breaker = ShortResetTimeoutCb( );


Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( await InterceptExceptionTypeAsync<TestException>( async () =>
await breaker.Instance.WithCircuitBreaker( async () =>
await Task.Factory.StartNew( ThrowException ))));
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );

Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( await InterceptExceptionTypeAsync<TestException>( async () =>
await breaker.Instance.WithCircuitBreaker( async () =>
await Task.Factory.StartNew( ThrowException ))));
Assert.True( CheckLatch( breaker.OpenLatch ) );
}

Expand All @@ -318,53 +328,65 @@ public void Should_Pass_Call_And_Transition_To_Open_On_Async_Failure( )
{
var breaker = ShortResetTimeoutCb( );

breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) );
breaker.Instance.WithCircuitBreaker( async () => await Task.Run( ThrowException ) );
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );

breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) );
breaker.Instance.WithCircuitBreaker( async () => await Task.Run( ThrowException ) );
Assert.True( CheckLatch( breaker.OpenLatch ) );
}
}

public class AnAsynchronousCircuitBreakerThatIsOpen : CircuitBreakerSpecBase
{
[Fact(DisplayName = "An asynchronous circuit breaker that is open should throw exceptions when called before reset timeout")]
public void Should_Throw_Exceptions_When_Called_Before_Reset_Timeout( )
public async Task Should_Throw_Exceptions_When_Called_Before_Reset_Timeout( )
{
var breaker = LongResetTimeoutCb( );

Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( await InterceptExceptionTypeAsync<TestException>(async ( ) =>
await breaker.Instance.WithCircuitBreaker( async () =>
await Task.Factory.StartNew( ThrowException ) ) ) );
Assert.True( CheckLatch( breaker.OpenLatch ) );
Assert.True( InterceptExceptionType<OpenCircuitException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( await InterceptExceptionTypeAsync<OpenCircuitException>(async ( ) =>
await breaker.Instance.WithCircuitBreaker( async () =>
await Task.Factory.StartNew( ThrowException ) ) ) );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is open should transition to half open when reset timeout")]
public void Should_Transition_To_Half_Open_When_Reset_Timeout( )
public async Task Should_Transition_To_Half_Open_When_Reset_Timeout( )
{
var breaker = ShortResetTimeoutCb( );

Assert.True( InterceptExceptionType<TestException>( ( ) => breaker.Instance.WithCircuitBreaker( () => Task.Factory.StartNew( ThrowException ) ).Wait( ) ) );
Assert.True( await InterceptExceptionTypeAsync<TestException>( async () =>
await breaker.Instance.WithCircuitBreaker( async () =>
await Task.Factory.StartNew( ThrowException ) ) ) );
Assert.True( CheckLatch( breaker.HalfOpenLatch ) );
}

[Fact(DisplayName = "An asynchronous circuit breaker that is open should increase the reset timeout after it transits to open again")]
public void Should_Reset_Timeout_After_It_Transits_To_Open_Again()
public async Task Should_Reset_Timeout_After_It_Transits_To_Open_Again()
{
var breaker = NonOneFactorCb();
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).Wait()));
Assert.True(await InterceptExceptionTypeAsync<TestException>(async () =>
await breaker.Instance.WithCircuitBreaker(async () =>
await Task.Run(ThrowException))));
Assert.True(CheckLatch(breaker.OpenLatch));

var e1 = InterceptException<OpenCircuitException>(() => breaker.Instance.WithSyncCircuitBreaker(SayTest));
var e1 = await InterceptExceptionAsync<OpenCircuitException>(async () =>
await breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayTest())));
var shortRemainingDuration = e1.RemainingDuration;

Thread.Sleep(1000);
await Task.Delay(1000);
Assert.True(CheckLatch(breaker.HalfOpenLatch));

// transit to open again
Assert.True(InterceptExceptionType<TestException>(() => breaker.Instance.WithCircuitBreaker(() => Task.Run(ThrowException)).Wait()));
Assert.True(await InterceptExceptionTypeAsync<TestException>(async () =>
await breaker.Instance.WithCircuitBreaker(async () =>
await Task.Run(ThrowException))));
Assert.True(CheckLatch(breaker.OpenLatch));

var e2 = InterceptException<OpenCircuitException>(() => breaker.Instance.WithSyncCircuitBreaker(SayTest));
var e2 = await InterceptExceptionAsync<OpenCircuitException>(async () =>
await breaker.Instance.WithCircuitBreaker(() => Task.FromResult(SayTest())));
var longRemainingDuration = e2.RemainingDuration;

Assert.True(shortRemainingDuration < longRemainingDuration);
Expand All @@ -373,8 +395,7 @@ public void Should_Reset_Timeout_After_It_Transits_To_Open_Again()

public class CircuitBreakerSpecBase : AkkaSpec
{
private readonly TimeSpan _awaitTimeout = TimeSpan.FromSeconds(2);
public TimeSpan AwaitTimeout { get { return _awaitTimeout; } }
public TimeSpan AwaitTimeout { get; } = TimeSpan.FromSeconds(2);

public bool CheckLatch( CountdownEvent latch )
{
Expand Down Expand Up @@ -413,6 +434,22 @@ protected T InterceptException<T>(Action actionThatThrows) where T : Exception
});
}

protected async Task<T> InterceptExceptionAsync<T>(Func<Task> actionThatThrows) where T : Exception
{
return await Assert.ThrowsAsync<T>(async () =>
{
try
{
await actionThatThrows();
}
catch (AggregateException ex)
{
foreach (var e in ex.Flatten().InnerExceptions.Where(e => e is T).Select(e => e))
throw e;
}
});
}

[SuppressMessage( "Microsoft.Design", "CA1004:GenericMethodsShouldProvideTypeParameter" )]
public bool InterceptExceptionType<T>( Action action ) where T : Exception
{
Expand Down Expand Up @@ -440,11 +477,11 @@ public bool InterceptExceptionType<T>( Action action ) where T : Exception
return true;
}

public async Task<bool> InterceptExceptionTypeAsync<T>(Task action) where T : Exception
public async Task<bool> InterceptExceptionTypeAsync<T>(Func<Task> action) where T : Exception
{
try
{
await action;
await action();
return false;
}
catch (Exception ex)
Expand Down