Skip to content

Commit 6334b39

Browse files
committed
Removed SuspendUntilFirstOfType reactive operator
1 parent c756e2c commit 6334b39

File tree

4 files changed

+15
-17
lines changed

4 files changed

+15
-17
lines changed

Core/Cleipnir.ResilientFunctions.Tests/ReactiveTests/SuspensionTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
using System;
12
using System.Threading.Tasks;
23
using Cleipnir.ResilientFunctions.Domain;
34
using Cleipnir.ResilientFunctions.Domain.Exceptions;
4-
using Cleipnir.ResilientFunctions.Reactive;
55
using Cleipnir.ResilientFunctions.Reactive.Extensions;
66
using Cleipnir.ResilientFunctions.Reactive.Origin;
77
using Microsoft.VisualStudio.TestTools.UnitTesting;
@@ -20,7 +20,7 @@ public async Task SuspensionExceptionIsThrownWhenNoEventHasBeenEmittedFromLeafOp
2020
source.SignalNext("world", new InterruptCount(2));
2121

2222
await Should.ThrowAsync<SuspendInvocationException>(
23-
() => source.SuspendUntilFirstOfType<int>()
23+
() => source.FirstOfType<int>(TimeSpan.Zero)
2424
);
2525
}
2626

@@ -32,7 +32,7 @@ public async Task EventIsEmittedInResultWhenEventHasBeenEmittedFromLeafOperator(
3232
source.SignalNext(1, new InterruptCount(2));
3333
source.SignalNext("world", new InterruptCount(3));
3434

35-
var next = await source.SuspendUntilFirstOfType<int>();
35+
var next = await source.FirstOfType<int>(TimeSpan.Zero);
3636

3737
next.ShouldBe(1);
3838
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/MessagingTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public async Task FunctionIsSuspendedWhenAwaitedMessageDoesNotAlreadyExist(Task<
6262
inner: async Task<string> (string _, Workflow workflow) =>
6363
{
6464
var messages = workflow.Messages;
65-
return await messages.SuspendUntilFirstOfType<string>();
65+
return await messages.FirstOfType<string>(maxWait: TimeSpan.Zero);
6666
}
6767
);
6868

@@ -148,7 +148,7 @@ public async Task ScheduleInvocationWithPublishResultToSpecifiedFunctionId(Task<
148148
inner: async Task<string> (string _, Workflow workflow) =>
149149
{
150150
await child.Schedule(childFunctionId.InstanceId.Value, param: "stuff");
151-
return await workflow.Messages.SuspendUntilFirstOfType<string>();
151+
return await workflow.Messages.FirstOfType<string>(TimeSpan.Zero);
152152
}
153153
);
154154

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/SuspensionTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ protected async Task SuspendedFunctionIsAutomaticallyReInvokedWhenEligibleAndWri
225225
async Task<string> (string param, Workflow workflow) =>
226226
{
227227
var messages = workflow.Messages;
228-
var next = await messages.SuspendUntilFirstOfType<string>();
228+
var next = await messages.FirstOfType<string>(TimeSpan.Zero);
229229
return next;
230230
}
231231
);
@@ -271,7 +271,7 @@ protected async Task SuspendedFunctionIsAutomaticallyReInvokedWhenEligibleByWatc
271271
async Task<string> (string param, Workflow workflow) =>
272272
{
273273
var messages = workflow.Messages;
274-
var next = await messages.SuspendUntilFirstOfType<string>();
274+
var next = await messages.FirstOfType<string>(TimeSpan.Zero);
275275
return next;
276276
}
277277
);

Core/Cleipnir.ResilientFunctions/Reactive/Extensions/LeafOperators.cs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,24 +93,22 @@ public static Task<Option<T>> SuspendUntilFirstOrNone<T>(this IReactiveChain<T>
9393
? new Option<T>(emits.Single())
9494
: Option<T>.NoValue
9595
);
96-
public static Task<T> SuspendUntilFirstOfType<T>(this IReactiveChain<object> s, TimeSpan? maxWait = null)
97-
=> s.OfType<T>().SuspendUntilFirst(maxWait);
9896
public static Task<List<T>> SuspendUntilFirsts<T>(this IReactiveChain<T> s, int count, TimeSpan? maxWait = null)
9997
=> s.Take(count).ToList(maxWait: maxWait ?? TimeSpan.Zero);
10098

101-
public static Task<T> First<T>(this IReactiveChain<T> s)
102-
=> FirstOrNone(s)
99+
public static Task<T> First<T>(this IReactiveChain<T> s, TimeSpan? maxWait = null)
100+
=> FirstOrNone(s, maxWait)
103101
.SelectAsync(o => o.HasValue ? o.Value : throw new NoResultException());
104102
public static Task<T?> FirstOrDefault<T>(this IReactiveChain<T> s)
105103
=> FirstOrNone(s)
106104
.SelectAsync(o => o.HasValue ? o.Value : default);
107-
public static Task<Option<T>> FirstOrNone<T>(this IReactiveChain<T> s)
108-
=> Firsts(s, count: 1)
105+
public static Task<Option<T>> FirstOrNone<T>(this IReactiveChain<T> s, TimeSpan? maxWait = null)
106+
=> Firsts(s, count: 1, maxWait)
109107
.SelectAsync(
110108
l => l.Any() ? new Option<T>(l.First()) : Option<T>.NoValue
111109
);
112-
public static Task<T> FirstOfType<T>(this IReactiveChain<object> s)
113-
=> s.OfType<T>().First();
110+
public static Task<T> FirstOfType<T>(this IReactiveChain<object> s, TimeSpan? maxWait = null)
111+
=> s.OfType<T>().First(maxWait);
114112

115113
public static Task<Option<T>> FirstOfType<T>(this IReactiveChain<object> s, string timeoutId, DateTime expiresAt)
116114
=> s.OfType<T>().TakeUntilTimeout(timeoutId, expiresAt).FirstOrNone();
@@ -119,8 +117,8 @@ public static Task<Option<T>> FirstOfType<T>(this IReactiveChain<object> s, stri
119117

120118
public static Task<T> FirstOf<T>(this IReactiveChain<object> s) => s.FirstOfType<T>();
121119

122-
public static Task<List<T>> Firsts<T>(this IReactiveChain<T> s, int count)
123-
=> s.Take(count).ToList();
120+
public static Task<List<T>> Firsts<T>(this IReactiveChain<T> s, int count, TimeSpan? maxWait = null)
121+
=> s.Take(count).ToList(maxWait);
124122

125123
#endregion
126124

0 commit comments

Comments
 (0)