Skip to content

Commit 269f2dd

Browse files
committed
Dry'ing Postponed- and CrashedWatchdogs
1 parent 65664f2 commit 269f2dd

File tree

7 files changed

+204
-216
lines changed

7 files changed

+204
-216
lines changed

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/PostponedTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ protected async Task PostponedFuncIsCompletedByWatchDog(Task<IFunctionStore> sto
1919
{
2020
var store = await storeTask;
2121
var functionTypeId = nameof(PostponedFuncIsCompletedByWatchDog).ToFunctionTypeId();
22-
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
2322
const string param = "test";
2423
{
24+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
2525
var crashableStore = new CrashableFunctionStore(store);
2626
using var functionsRegistry = new FunctionsRegistry
2727
(
@@ -44,6 +44,7 @@ await Should.ThrowAsync<FunctionInvocationPostponedException>(() =>
4444
unhandledExceptionHandler.ThrownExceptions.Count.ShouldBe(0);
4545
}
4646
{
47+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
4748
using var functionsRegistry = new FunctionsRegistry(
4849
store,
4950
new Settings(

Core/Cleipnir.ResilientFunctions.Tests/Utils/TaskLinq.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ public static Task<TOut> Map<TIn, TOut>(this Task<TIn> task, Func<TIn, TOut> f)
1313
public static Task<List<T>> ToListAsync<T>(this Task<IEnumerable<T>> task)
1414
=> task.ContinueWith(t => t.Result.ToList());
1515

16-
public static Task<bool> Any<T>(this Task<IEnumerable<T>> task) => task.ContinueWith(t => t.Result.Any());
16+
public static Task<bool> AnyAsync<T>(this Task<IEnumerable<T>> task) => task.ContinueWith(t => t.Result.Any());
17+
public static Task<bool> AnyAsync<T>(this Task<IReadOnlyList<T>> task) => task.ContinueWith(t => t.Result.Any());
1718

1819
public static async Task<T> WithTimeout<T>(this Task<T> task, int thresholdMs)
1920
{
Lines changed: 8 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,114 +1,26 @@
11
using System;
2-
using System.Collections.Generic;
32
using System.Threading.Tasks;
4-
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
5-
using Cleipnir.ResilientFunctions.Domain;
6-
using Cleipnir.ResilientFunctions.Domain.Exceptions;
7-
using Cleipnir.ResilientFunctions.Helpers;
8-
using Cleipnir.ResilientFunctions.Storage;
93

104
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
115

126
internal class CrashedWatchdog
137
{
14-
private readonly FunctionTypeId _functionTypeId;
15-
private readonly ReInvoke _reInvoke;
16-
private readonly IFunctionStore _functionStore;
178
private readonly TimeSpan _leaseLength;
18-
private readonly TimeSpan _delayStartUp;
19-
private readonly UnhandledExceptionHandler _unhandledExceptionHandler;
20-
private readonly ShutdownCoordinator _shutdownCoordinator;
21-
22-
private readonly AsyncSemaphore _maxParallelismSemaphore;
23-
private readonly HashSet<FunctionInstanceId> _toBeExecuted = new();
24-
private readonly object _sync = new();
9+
private readonly Restarter _restarter;
2510

26-
public CrashedWatchdog(
27-
FunctionTypeId functionTypeId,
28-
IFunctionStore functionStore,
29-
ReInvoke reInvoke,
30-
AsyncSemaphore maxParallelismSemaphore,
31-
TimeSpan leaseLength,
32-
TimeSpan delayStartUp,
33-
UnhandledExceptionHandler unhandledExceptionHandler,
34-
ShutdownCoordinator shutdownCoordinator)
11+
public CrashedWatchdog(TimeSpan leaseLength, RestarterFactory restarterFactory)
3512
{
36-
_functionTypeId = functionTypeId;
37-
_functionStore = functionStore;
38-
_reInvoke = reInvoke;
39-
_maxParallelismSemaphore = maxParallelismSemaphore;
4013
_leaseLength = leaseLength;
41-
_delayStartUp = delayStartUp;
42-
_unhandledExceptionHandler = unhandledExceptionHandler;
43-
_shutdownCoordinator = shutdownCoordinator;
14+
_restarter = restarterFactory.Create(
15+
(functionTypeId, store, t) => store.GetCrashedFunctions(functionTypeId, leaseExpiresBefore: t)
16+
);
4417
}
4518

4619
public async Task Start()
4720
{
48-
if (_leaseLength == TimeSpan.Zero) return;
49-
await Task.Delay(_delayStartUp);
21+
if (_leaseLength == TimeSpan.Zero)
22+
return;
5023

51-
Start:
52-
try
53-
{
54-
while (!_shutdownCoordinator.ShutdownInitiated)
55-
{
56-
var hangingFunctions =
57-
await _functionStore.GetCrashedFunctions(_functionTypeId, leaseExpiresBefore: DateTime.UtcNow.Ticks);
58-
59-
foreach (var hangingFunction in hangingFunctions.RandomlyPermute())
60-
_ = ReInvokeCrashedFunction(hangingFunction);
61-
62-
await Task.Delay(_leaseLength);
63-
}
64-
}
65-
catch (Exception thrownException)
66-
{
67-
_unhandledExceptionHandler.Invoke(
68-
new FrameworkException(
69-
_functionTypeId,
70-
$"{nameof(CrashedWatchdog)} for '{_functionTypeId}' failed - retrying in 5 seconds",
71-
innerException: thrownException
72-
)
73-
);
74-
75-
await Task.Delay(5_000);
76-
goto Start;
77-
}
78-
}
79-
80-
private async Task ReInvokeCrashedFunction(InstanceIdAndEpoch sef)
81-
{
82-
lock (_sync)
83-
if (!_toBeExecuted.Add(sef.InstanceId))
84-
return;
85-
86-
using var @lock = await _maxParallelismSemaphore.Take();
87-
88-
if (_shutdownCoordinator.ShutdownInitiated) return;
89-
try
90-
{
91-
await _reInvoke(sef.InstanceId.Value, expectedEpoch: sef.Epoch);
92-
}
93-
catch (ObjectDisposedException) { } //ignore when functionsRegistry has been disposed
94-
catch (UnexpectedFunctionState) { } //ignore when the functions state has changed since fetching it
95-
catch (FunctionInvocationPostponedException) { }
96-
catch (FunctionInvocationSuspendedException) { }
97-
catch (Exception innerException)
98-
{
99-
var functionId = new FunctionId(_functionTypeId, sef.InstanceId);
100-
_unhandledExceptionHandler.Invoke(
101-
new FrameworkException(
102-
_functionTypeId,
103-
$"{nameof(CrashedWatchdog)} failed while executing: '{functionId}'",
104-
innerException
105-
)
106-
);
107-
}
108-
finally
109-
{
110-
lock (_sync)
111-
_toBeExecuted.Remove(sef.InstanceId);
112-
}
24+
await _restarter.Start(nameof(CrashedWatchdog));
11325
}
11426
}
Lines changed: 6 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,116 +1,20 @@
11
using System;
22
using System.Threading.Tasks;
3-
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
4-
using Cleipnir.ResilientFunctions.Domain;
5-
using Cleipnir.ResilientFunctions.Domain.Exceptions;
6-
using Cleipnir.ResilientFunctions.Helpers;
7-
using Cleipnir.ResilientFunctions.Storage;
83

94
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
105

116
internal class PostponedWatchdog
127
{
13-
private readonly IFunctionStore _functionStore;
14-
private readonly UnhandledExceptionHandler _unhandledExceptionHandler;
15-
private readonly ShutdownCoordinator _shutdownCoordinator;
16-
private readonly ScheduleReInvokeFromWatchdog _scheduleReInvoke;
17-
private readonly RestartFunction _restartFunction;
18-
private readonly TimeSpan _checkFrequency;
19-
private readonly TimeSpan _delayStartUp;
20-
private readonly FunctionTypeId _functionTypeId;
8+
private readonly Restarter _restarter;
219

22-
private int _maxParallelismLeft;
23-
private readonly object _sync = new();
10+
public PostponedWatchdog(RestarterFactory restarterFactory)
11+
=> _restarter = restarterFactory.Create(
12+
getEligibleFunctions: (functionTypeId, store, t) => store.GetPostponedFunctions(functionTypeId, isEligibleBefore: t)
13+
);
2414

25-
public PostponedWatchdog(
26-
FunctionTypeId functionTypeId,
27-
IFunctionStore functionStore,
28-
ScheduleReInvokeFromWatchdog scheduleReInvoke,
29-
RestartFunction restartFunction,
30-
int maxParallelism,
31-
TimeSpan checkFrequency,
32-
TimeSpan delayStartUp,
33-
UnhandledExceptionHandler unhandledExceptionHandler,
34-
ShutdownCoordinator shutdownCoordinator)
35-
{
36-
_functionTypeId = functionTypeId;
37-
_functionStore = functionStore;
38-
_unhandledExceptionHandler = unhandledExceptionHandler;
39-
_shutdownCoordinator = shutdownCoordinator;
40-
_maxParallelismLeft = maxParallelism;
41-
_scheduleReInvoke = scheduleReInvoke;
42-
_restartFunction = restartFunction;
43-
_checkFrequency = checkFrequency;
44-
_delayStartUp = delayStartUp;
45-
}
4615

4716
public async Task Start()
4817
{
49-
if (_checkFrequency == TimeSpan.Zero) return;
50-
await Task.Delay(_delayStartUp);
51-
52-
Start:
53-
try
54-
{
55-
while (!_shutdownCoordinator.ShutdownInitiated)
56-
{
57-
var now = DateTime.UtcNow;
58-
59-
var eligible =
60-
(await _functionStore.GetPostponedFunctions(_functionTypeId, now.Ticks)).WithRandomOffset();
61-
62-
foreach (var spf in eligible)
63-
{
64-
lock (_sync)
65-
if (_maxParallelismLeft == 0) break;
66-
else _maxParallelismLeft--;
67-
68-
var runningFunction = _shutdownCoordinator.TryRegisterRunningFunction();
69-
if (runningFunction == null)
70-
return;
71-
72-
var functionId = new FunctionId(_functionTypeId, spf.InstanceId);
73-
var restartedFunction = await _restartFunction(functionId, spf.Epoch);
74-
if (restartedFunction == null)
75-
{
76-
runningFunction.Dispose();
77-
break;
78-
}
79-
80-
await _scheduleReInvoke(
81-
spf.InstanceId,
82-
restartedFunction,
83-
onCompletion: () =>
84-
{
85-
lock (_sync)
86-
_maxParallelismLeft++;
87-
88-
runningFunction.Dispose();
89-
}
90-
);
91-
}
92-
93-
var timeElapsed = DateTime.UtcNow - now;
94-
var delay = TimeSpanHelper.Max(
95-
TimeSpan.Zero,
96-
_checkFrequency - timeElapsed
97-
);
98-
99-
await Task.Delay(delay);
100-
}
101-
}
102-
catch (Exception innerException)
103-
{
104-
_unhandledExceptionHandler.Invoke(
105-
new FrameworkException(
106-
_functionTypeId,
107-
$"{nameof(PostponedWatchdog)} for '{_functionTypeId}' failed - retrying in 5 seconds",
108-
innerException
109-
)
110-
);
111-
112-
await Task.Delay(5_000);
113-
goto Start;
114-
}
18+
await _restarter.Start(nameof(PostponedWatchdog));
11519
}
11620
}

0 commit comments

Comments
 (0)