Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ internal interface IQueryLanguage

#region * Single *

IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value);
IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler);
IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source);
IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count);
IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip);
Expand All @@ -580,6 +582,8 @@ internal interface IQueryLanguage
IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction);
IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source);
IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source);
IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value);
IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler);
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source);
IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount);
IObservable<TSource> RepeatWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler);
Expand Down
77 changes: 77 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,45 @@ namespace System.Reactive.Linq
{
public static partial class Observable
{
#region + Append +

/// <summary>
/// Append a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to append the value to.</param>
/// <param name="value">Value to append to the specified sequence.</param>
/// <returns>The source sequence appended with the specified value.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> Append<TSource>(this IObservable<TSource> source, TSource value)
{
if (source == null)
throw new ArgumentNullException(nameof(source));

return s_impl.Append<TSource>(source, value);
}

/// <summary>
/// Append a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to append the value to.</param>
/// <param name="value">Value to append to the specified sequence.</param>
/// <param name="scheduler">Scheduler to emit the append values on.</param>
/// <returns>The source sequence appended with the specified value.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> Append<TSource>(this IObservable<TSource> source, TSource value, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return s_impl.Append<TSource>(source, value, scheduler);
}

#endregion

#region + AsObservable +

/// <summary>
Expand Down Expand Up @@ -341,6 +380,44 @@ public static IObservable<Notification<TSource>> Materialize<TSource>(this IObse

#endregion

#region + Prepend +

/// <summary>
/// Prepend a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to prepend the value to.</param>
/// <param name="value">Value to prepend to the specified sequence.</param>
/// <returns>The source sequence prepended with the specified value.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> Prepend<TSource>(this IObservable<TSource> source, TSource value)
{
if (source == null)
throw new ArgumentNullException(nameof(source));

return s_impl.Prepend<TSource>(source, value);
}

/// <summary>
/// Prepend a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to prepend the value to.</param>
/// <param name="value">Value to prepend to the specified sequence.</param>
/// <param name="scheduler">Scheduler to emit the prepend values on.</param>
/// <returns>The source sequence prepended with the specified value.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
public static IObservable<TSource> Prepend<TSource>(this IObservable<TSource> source, TSource value, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return s_impl.Prepend<TSource>(source, value, scheduler);
}

#endregion
#region + Repeat +

/// <summary>
Expand Down
122 changes: 121 additions & 1 deletion Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* WARNING: Auto-generated file (06/12/2018 13:00:48)
* WARNING: Auto-generated file (merged on 06/13/2018)
* Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
*/

Expand Down Expand Up @@ -310,6 +310,66 @@ public static IQbservable<bool> Any<TSource>(this IQbservable<TSource> source, E
);
}

/// <summary>
/// Append a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to append the value to.</param>
/// <param name="value">Value to append to the specified sequence.</param>
/// <returns>The source sequence appended with the specified value.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> Append<TSource>(this IQbservable<TSource> source, TSource value)
{
if (source == null)
throw new ArgumentNullException(nameof(source));

return source.Provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.Append<TSource>(default(IQbservable<TSource>), default(TSource))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
source.Expression,
Expression.Constant(value, typeof(TSource))
)
);
}

/// <summary>
/// Append a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to append the value to.</param>
/// <param name="value">Value to append to the specified sequence.</param>
/// <param name="scheduler">Scheduler to emit the append values on.</param>
/// <returns>The source sequence appended with the specified value.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> Append<TSource>(this IQbservable<TSource> source, TSource value, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return source.Provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.Append<TSource>(default(IQbservable<TSource>), default(TSource), default(IScheduler))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
source.Expression,
Expression.Constant(value, typeof(TSource)),
Expression.Constant(scheduler, typeof(IScheduler))
)
);
}

/// <summary>
/// Automatically connect the upstream IConnectableObservable at most once when the
/// specified number of IObservers have subscribed to this IObservable.
Expand Down Expand Up @@ -10472,6 +10532,66 @@ public static IQbservable<TSource> OnErrorResumeNext<TSource>(this IQbservablePr
);
}

/// <summary>
/// Prepend a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to prepend the value to.</param>
/// <param name="value">Value to prepend to the specified sequence.</param>
/// <returns>The source sequence prepended with the specified value.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> Prepend<TSource>(this IQbservable<TSource> source, TSource value)
{
if (source == null)
throw new ArgumentNullException(nameof(source));

return source.Provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.Prepend<TSource>(default(IQbservable<TSource>), default(TSource))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
source.Expression,
Expression.Constant(value, typeof(TSource))
)
);
}

/// <summary>
/// Prepend a value to an observable sequence.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to prepend the value to.</param>
/// <param name="value">Value to prepend to the specified sequence.</param>
/// <param name="scheduler">Scheduler to emit the prepend values on.</param>
/// <returns>The source sequence prepended with the specified value.</returns>
/// <exception cref="ArgumentNullException">
/// <paramref name="source" /> is null.</exception>
public static IQbservable<TSource> Prepend<TSource>(this IQbservable<TSource> source, TSource value, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException(nameof(source));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return source.Provider.CreateQuery<TSource>(
Expression.Call(
null,
#if CRIPPLED_REFLECTION
InfoOf(() => Qbservable.Prepend<TSource>(default(IQbservable<TSource>), default(TSource), default(IScheduler))),
#else
((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
#endif
source.Expression,
Expression.Constant(value, typeof(TSource)),
Expression.Constant(scheduler, typeof(IScheduler))
)
);
}

/// <summary>
/// Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence.
/// This operator is a specialization of Multicast using a regular <see cref="T:System.Reactive.Subjects.Subject`1" />.
Expand Down
38 changes: 38 additions & 0 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@ namespace System.Reactive.Linq

internal partial class QueryLanguage
{
#region - Append -

public virtual IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value)
{
return Append_<TSource>(source, value, SchedulerDefaults.ConstantTimeOperations);
}

public virtual IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
{
return Append_<TSource>(source, value, scheduler);
}

private static IObservable<TSource> Append_<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
{
return source.Concat(new [] { value }.ToObservable(scheduler));
}

#endregion

#region + AsObservable +

public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)
Expand Down Expand Up @@ -154,6 +173,25 @@ public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservab

#endregion

#region - Prepend -

public virtual IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value)
{
return Prepend_<TSource>(source, value, SchedulerDefaults.ConstantTimeOperations);
}

public virtual IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
{
return Prepend_<TSource>(source, value, scheduler);
}

private static IObservable<TSource> Prepend_<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
{
return StartWith_(source, scheduler, new[] { value });
}

#endregion

#region - Repeat -

public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,8 @@ namespace System.Reactive.Linq
public static System.Reactive.Joins.Pattern<TLeft, TRight> And<TLeft, TRight>(this System.IObservable<TLeft> left, System.IObservable<TRight> right) { }
public static System.IObservable<bool> Any<TSource>(this System.IObservable<TSource> source) { }
public static System.IObservable<bool> Any<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
public static System.IObservable<TSource> Append<TSource>(this System.IObservable<TSource> source, TSource value) { }
public static System.IObservable<TSource> Append<TSource>(this System.IObservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> AsObservable<TSource>(this System.IObservable<TSource> source) { }
public static System.IObservable<TSource> AutoConnect<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers = 1, System.Action<System.IDisposable> onConnect = null) { }
public static System.IObservable<double> Average(this System.IObservable<double> source) { }
Expand Down Expand Up @@ -1237,6 +1239,8 @@ namespace System.Reactive.Linq
public static System.IObservable<TSource> OnErrorResumeNext<TSource>(this System.IObservable<TSource> first, System.IObservable<TSource> second) { }
public static System.IObservable<TSource> OnErrorResumeNext<TSource>(params System.IObservable<>[] sources) { }
public static System.IObservable<TSource> OnErrorResumeNext<TSource>(this System.Collections.Generic.IEnumerable<System.IObservable<TSource>> sources) { }
public static System.IObservable<TSource> Prepend<TSource>(this System.IObservable<TSource> source, TSource value) { }
public static System.IObservable<TSource> Prepend<TSource>(this System.IObservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.Reactive.Subjects.IConnectableObservable<TSource> Publish<TSource>(this System.IObservable<TSource> source) { }
public static System.IObservable<TResult> Publish<TSource, TResult>(this System.IObservable<TSource> source, System.Func<System.IObservable<TSource>, System.IObservable<TResult>> selector) { }
public static System.Reactive.Subjects.IConnectableObservable<TSource> Publish<TSource>(this System.IObservable<TSource> source, TSource initialValue) { }
Expand Down Expand Up @@ -1622,6 +1626,8 @@ namespace System.Reactive.Linq
public static System.Reactive.Joins.QueryablePattern<TLeft, TRight> And<TLeft, TRight>(this System.Reactive.Linq.IQbservable<TLeft> left, System.IObservable<TRight> right) { }
public static System.Reactive.Linq.IQbservable<bool> Any<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
public static System.Reactive.Linq.IQbservable<bool> Any<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }
public static System.Reactive.Linq.IQbservable<TSource> Append<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value) { }
public static System.Reactive.Linq.IQbservable<TSource> Append<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.IObservable<TSource> AsObservable<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
public static System.Reactive.Linq.IQbservable<TSource> AsQbservable<TSource>(this System.IObservable<TSource> source) { }
public static System.Reactive.Linq.IQbservable<TSource> AutoConnect<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers, System.Linq.Expressions.Expression<System.Action<System.IDisposable>> onConnect) { }
Expand Down Expand Up @@ -1959,6 +1965,8 @@ namespace System.Reactive.Linq
public static System.Reactive.Linq.IQbservable<TSource> OnErrorResumeNext<TSource>(this System.Reactive.Linq.IQbservable<TSource> first, System.IObservable<TSource> second) { }
public static System.Reactive.Linq.IQbservable<TSource> OnErrorResumeNext<TSource>(this System.Reactive.Linq.IQbservableProvider provider, params System.IObservable<>[] sources) { }
public static System.Reactive.Linq.IQbservable<TSource> OnErrorResumeNext<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Collections.Generic.IEnumerable<System.IObservable<TSource>> sources) { }
public static System.Reactive.Linq.IQbservable<TSource> Prepend<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value) { }
public static System.Reactive.Linq.IQbservable<TSource> Prepend<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
public static System.Reactive.Linq.IQbservable<TResult> Publish<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector) { }
public static System.Reactive.Linq.IQbservable<TResult> Publish<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector, TSource initialValue) { }
public static System.Reactive.Linq.IQbservable<TResult> PublishLast<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector) { }
Expand Down
Loading