Skip to content

Bug in Merge for USE_FAIR_AND_CHEAPER_MERGE #2134

@fabianoliver

Description

@fabianoliver

I think the Merge implementation when using USE_FAIR_AND_CHEAPER_MERGE may be incorrect.

Consider this (fairly bad) unit test; this does not throw an AggregateException with an inner "test"-Exception, but rather the following:

Exit code is -532462766 (Unhandled exception. System.InvalidOperationException: Operation is not valid due to the current state of the object.
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.OnCompleted(Action`1 continuation, Object state, Int16 token, ValueTaskSourceOnCompletedFlags flags)
   at TestProject1.Tests.DontThrow()+System.Threading.Tasks.Sources.IValueTaskSource<System.Boolean>.OnCompleted()
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1.AwaitUnsafeOnCompleted[TAwaiter](TAwaiter& awaiter, IAsyncStateMachineBox box)
--- End of stack trace from previous location ---
   at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
   at System.Threading.QueueUserWorkItemCallback.Execute()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart())
    [Test]
    public async Task Test()
    {
        var observables = new[] { DontThrow(), Throw() };
        var merged = AsyncEnumerableEx.Merge(observables);

        await foreach (var _ in merged)
        {
            // ignored
        }
    }

    private async IAsyncEnumerable<int> DontThrow()
    {
        await Task.Delay(TimeSpan.FromSeconds(20));
        yield return 1;
    }
    
    private async IAsyncEnumerable<int> Throw()
    {
        await Task.Delay(TimeSpan.FromSeconds(10));
        throw new Exception("test");
        yield break;
    }

I think that is because this implementation may await a given ValueTask up to three times - and worse so, the latter awaits may be on an already completed ValueTask, which to the best of my knowledge is not allowed / causes undefined behaviour (given the backing state machine in the async case may be recycled for other ValueTasks after the first await).

As a simple example, consider the following chain of events. For simplicity, say the input argument to Merge is a array that contains only a single IAsyncEnumerable.

  1. Line 58: MoveNext() is called, and stored in moveNextTasks
  2. Line 61: A WhenAny awaiatble construct is created, which will effectively await this task (by registeding an OnCompleted callback )
  3. Let's say after that happens, the source IAsyncEnumerable now throws an exception
  4. Line 68 not awaits WhenAny, which will return 0 (as our single input IAsyncEnumerable completed by throwing its exception)
  5. In line 73, we now await the already created MoveNext-ValueTask again. This is the very same value task that was already awaited by the WhenAny construct - therefore, we now await an already completed ValueTask for the second time.
  6. Because this await throws, we will exit the block immediately, and therefore not clean up moveNextTasks (as in, we will not replace the existing / awaited ValueTask with anything different)
  7. Finally, we end up in line 146, where we await the same ValueTask once again. So, we're awaiting it for the third time now.

If there's any interest in doing so, I'd be happy to open up a PR and either attempt to fix this, or possibly remove this implementation altogether - depending on whether there's any interest of switching on USE_FAIR_AND_CHEAPER_MERGE for NuGet releases in the foreseeable future?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions