Skip to content

Commit

Permalink
Fix flaky test by restructuring task model (#10145)
Browse files Browse the repository at this point in the history
  • Loading branch information
jviau committed May 16, 2024
1 parent 31032fc commit cd4ad63
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 23 deletions.
19 changes: 19 additions & 0 deletions src/WebJobs.Script/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Script.Extensions
{
internal static class TaskExtensions
{
/// <summary>
/// Forgets the task. This method is used to indicating not awaiting a task is intentional.
/// </summary>
/// <param name="task">The task to forget.</param>
public static void Forget(this Task task)
{
// No op - this method is used to suppress the compiler warning.
}
}
}
44 changes: 22 additions & 22 deletions src/WebJobs.Script/Workers/Http/HttpFunctionInvocationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

Expand Down Expand Up @@ -68,16 +69,12 @@ internal HttpFunctionInvocationDispatcher()

public int ErrorEventsThreshold { get; private set; }

internal Task InitializeHttpWorkerChannelAsync(int attemptCount, CancellationToken cancellationToken = default)
internal async Task InitializeHttpWorkerChannelAsync(int attemptCount, CancellationToken cancellationToken = default)
{
_httpWorkerChannel = _httpWorkerChannelFactory.Create(_scriptOptions.RootScriptPath, _metricsLogger, attemptCount);
_httpWorkerChannel.StartWorkerProcessAsync(cancellationToken).ContinueWith(workerInitTask =>
{
_logger.LogDebug("Adding http worker channel. workerId:{id}", _httpWorkerChannel.Id);
SetFunctionDispatcherStateToInitializedAndLog();
}, TaskContinuationOptions.OnlyOnRanToCompletion);

return Task.CompletedTask;
await _httpWorkerChannel.StartWorkerProcessAsync(cancellationToken);
_logger.LogDebug("Adding http worker channel. workerId:{id}", _httpWorkerChannel.Id);
SetFunctionDispatcherStateToInitializedAndLog();
}

private void SetFunctionDispatcherStateToInitializedAndLog()
Expand All @@ -86,41 +83,42 @@ private void SetFunctionDispatcherStateToInitializedAndLog()
_logger.LogInformation("Worker process started and initialized.");
}

public async Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default)
public Task InitializeAsync(IEnumerable<FunctionMetadata> functions, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

if (functions == null || !functions.Any())
{
// do not initialize function dispachter if there are no functions
return;
return Task.CompletedTask;
}

State = FunctionInvocationDispatcherState.Initializing;
await InitializeHttpWorkerChannelAsync(0, cancellationToken);
InitializeHttpWorkerChannelAsync(0, cancellationToken).Forget();
return Task.CompletedTask;
}

public Task InvokeAsync(ScriptInvocationContext invocationContext)
{
return _httpWorkerChannel.InvokeAsync(invocationContext);
}

public async void WorkerError(HttpWorkerErrorEvent workerError)
public void WorkerError(HttpWorkerErrorEvent workerError)
{
if (!_disposing)
{
_logger.LogDebug("Handling WorkerErrorEvent for workerId:{workerId}. Failed with: {exception}", workerError.WorkerId, workerError.Exception);
AddOrUpdateErrorBucket(workerError);
await DisposeAndRestartWorkerChannel(workerError.WorkerId);
DisposeAndRestartWorkerChannel(workerError.WorkerId);
}
}

public async void WorkerRestart(HttpWorkerRestartEvent workerRestart)
public void WorkerRestart(HttpWorkerRestartEvent workerRestart)
{
if (!_disposing)
{
_logger.LogDebug("Handling WorkerRestartEvent for workerId:{workerId}", workerRestart.WorkerId);
await DisposeAndRestartWorkerChannel(workerRestart.WorkerId);
DisposeAndRestartWorkerChannel(workerRestart.WorkerId);
}
}

Expand All @@ -130,7 +128,7 @@ public Task StartWorkerChannel()
return Task.CompletedTask;
}

private async Task DisposeAndRestartWorkerChannel(string workerId)
private void DisposeAndRestartWorkerChannel(string workerId)
{
// Since we only have one HTTP worker process, as soon as we dispose it, InvokeAsync will fail. Set state to
// indicate we are not ready to receive new requests.
Expand All @@ -140,15 +138,16 @@ private async Task DisposeAndRestartWorkerChannel(string workerId)
{
(_httpWorkerChannel as IDisposable)?.Dispose();
}
await RestartWorkerChannel(workerId);

RestartWorkerChannel(workerId);
}

private async Task RestartWorkerChannel(string workerId)
private void RestartWorkerChannel(string workerId)
{
if (_invokerErrors.Count < ErrorEventsThreshold)
{
_logger.LogDebug("Restarting http invoker channel");
await InitializeHttpWorkerChannelAsync(_invokerErrors.Count);
InitializeHttpWorkerChannelAsync(_invokerErrors.Count).Forget();
}
else
{
Expand Down Expand Up @@ -207,10 +206,11 @@ public Task ShutdownAsync()
return Task.CompletedTask;
}

public async Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
public Task<bool> RestartWorkerWithInvocationIdAsync(string invocationId)
{
await DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id); // Since there's only one channel for httpworker
return true;
// Since there's only one channel for httpworker
DisposeAndRestartWorkerChannel(_httpWorkerChannel.Id);
return Task.FromResult(true);
}

public void PreShutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public async Task TestDelay_StartProcess(bool startWorkerProcessResult)
{
}

await Task.Delay(TimeSpan.FromMilliseconds(500));
if (startWorkerProcessResult)
{
Assert.Equal(dispatcher.State, FunctionInvocationDispatcherState.Initialized);
Expand Down

0 comments on commit cd4ad63

Please sign in to comment.