Skip to content

Commit 878bef8

Browse files
authored
Fix GrpcWorkerChannel.StartWorkerProcessAsync timeout (#10937)
* Fix GrpcWorkerChannel.StartWorkerProcessAsync timeout * Add unit tests for worker exit * Fix expectations * Skip artifact on reruns * Fix tests setting env vars * Refactor ScriptStartupTypeDiscovererTests for env isolation * Fix more unit tests * Update new test method * Update release notes * Swallow Publish event exceptions * Fix test warning, add exit code * Ensure test process is cleaned up * Revert "Ensure test process is cleaned up" This reverts commit a816a7f. * Add crash blame for debugging * Only close process on success * Revert --blame-crash change
1 parent 90a2950 commit 878bef8

26 files changed

+966
-1327
lines changed

Directory.Build.targets

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,8 @@
22

33
<Import Project="$(EngBuildRoot)Engineering.targets" />
44

5+
<PropertyGroup>
6+
<VSTestResultsDirectory>$(ArtifactsPath)/log/$(ArtifactsProjectName)/tests_$(ArtifactsPivots)/</VSTestResultsDirectory>
7+
</PropertyGroup>
8+
59
</Project>

eng/ci/templates/jobs/run-unit-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
displayName: Publish deps.json
1717
path: $(Build.ArtifactStagingDirectory)
1818
artifact: WebHost_Deps
19-
condition: failed()
19+
condition: and(failed(), eq(variables['System.JobAttempt'], 1)) # only publish on first attempt
2020

2121
steps:
2222
- template: /eng/ci/templates/install-dotnet.yml@self

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@
55
-->
66
- Adding a "web app" configuration profile (#11447)
77
- Add JitTrace Files for v4.1045
8+
- Throw exception instead of timing out when worker channel exits before initializing gRPC (#10937)
89
- Adding empty remote message check in the SystemLogger (#11473)

src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
using Microsoft.Azure.WebJobs.Script.Diagnostics;
2424
using Microsoft.Azure.WebJobs.Script.Diagnostics.OpenTelemetry;
2525
using Microsoft.Azure.WebJobs.Script.Eventing;
26-
using Microsoft.Azure.WebJobs.Script.Exceptions;
2726
using Microsoft.Azure.WebJobs.Script.Extensions;
2827
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
2928
using Microsoft.Azure.WebJobs.Script.Grpc.Extensions;
@@ -367,19 +366,39 @@ private void DispatchMessage(InboundGrpcEvent msg)
367366

368367
public bool IsChannelReadyForInvocations()
369368
{
370-
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
369+
return !_disposing && !_disposed
370+
&& _state.HasFlag(
371+
RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
371372
}
372373

373374
public async Task StartWorkerProcessAsync(CancellationToken cancellationToken)
374375
{
375-
RegisterCallbackForNextGrpcMessage(MsgType.StartStream, _workerConfig.CountOptions.ProcessStartupTimeout, 1, SendWorkerInitRequest, HandleWorkerStartStreamError);
376-
// note: it is important that the ^^^ StartStream is in place *before* we start process the loop, otherwise we get a race condition
376+
RegisterCallbackForNextGrpcMessage(
377+
MsgType.StartStream,
378+
_workerConfig.CountOptions.ProcessStartupTimeout,
379+
count: 1,
380+
SendWorkerInitRequest,
381+
HandleWorkerStartStreamError);
382+
383+
// note: it is important that the ^^^ StartStream is in place *before* we start process the loop,
384+
// otherwise we get a race condition
377385
_ = ProcessInbound();
378386

379387
_workerChannelLogger.LogDebug("Initiating Worker Process start up");
380-
await _rpcWorkerProcess.StartProcessAsync();
381-
_state = _state | RpcWorkerChannelState.Initializing;
382-
await _workerInitTask.Task;
388+
await _rpcWorkerProcess.StartProcessAsync(cancellationToken);
389+
_state |= RpcWorkerChannelState.Initializing;
390+
Task<int> exited = _rpcWorkerProcess.WaitForExitAsync(cancellationToken);
391+
Task winner = await Task.WhenAny(_workerInitTask.Task, exited).WaitAsync(cancellationToken);
392+
await winner;
393+
394+
if (winner == exited)
395+
{
396+
// Process exited without throwing. We need to throw to indicate process is not running.
397+
throw new WorkerProcessExitException("Worker process exited before initializing.")
398+
{
399+
ExitCode = await exited,
400+
};
401+
}
383402
}
384403

385404
public async Task<WorkerStatus> GetWorkerStatusAsync()

src/WebJobs.Script.Grpc/ProcessManagement/IWorkerProcess.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using System.Diagnostics;
5+
using System.Threading;
56
using System.Threading.Tasks;
67

78
namespace Microsoft.Azure.WebJobs.Script.Workers
@@ -12,7 +13,9 @@ internal interface IWorkerProcess
1213

1314
Process Process { get; }
1415

15-
Task StartProcessAsync();
16+
Task StartProcessAsync(CancellationToken cancellationToken = default);
17+
18+
Task<int> WaitForExitAsync(CancellationToken cancellationToken = default);
1619

1720
void WaitForProcessExitInMilliSeconds(int waitTime);
1821
}

src/WebJobs.Script.Grpc/ProcessManagement/WorkerProcess.cs

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.IO;
88
using System.Linq;
99
using System.Reactive.Linq;
10+
using System.Threading;
1011
using System.Threading.Tasks;
1112
using Microsoft.Azure.WebJobs.Host.Scale;
1213
using Microsoft.Azure.WebJobs.Logging;
@@ -33,10 +34,11 @@ internal abstract class WorkerProcess : IWorkerProcess, IDisposable
3334
private readonly IEnvironment _environment;
3435
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _scriptApplicationHostOptions;
3536

36-
private bool _useStdErrorStreamForErrorsOnly;
37-
private Queue<string> _processStdErrDataQueue = new Queue<string>(3);
37+
private readonly object _syncLock = new();
38+
private readonly bool _useStdErrorStreamForErrorsOnly;
39+
private Queue<string> _processStdErrDataQueue = new(3);
3840
private IHostProcessMonitor _processMonitor;
39-
private object _syncLock = new object();
41+
private TaskCompletionSource<int> _processExit; // used to hold custom exceptions on non-success exit.
4042

4143
internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry processRegistry, ILogger workerProcessLogger, IWorkerConsoleLogSource consoleLogSource, IMetricsLogger metricsLogger,
4244
IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IEnvironment environment, IOptionsMonitor<ScriptApplicationHostOptions> scriptApplicationHostOptions, bool useStdErrStreamForErrorsOnly = false)
@@ -69,8 +71,9 @@ internal WorkerProcess(IScriptEventManager eventManager, IProcessRegistry proces
6971

7072
internal abstract Process CreateWorkerProcess();
7173

72-
public Task StartProcessAsync()
74+
public Task StartProcessAsync(CancellationToken cancellationToken = default)
7375
{
76+
cancellationToken.ThrowIfCancellationRequested();
7477
using (_metricsLogger.LatencyEvent(MetricEventNames.ProcessStart))
7578
{
7679
Process = CreateWorkerProcess();
@@ -79,11 +82,12 @@ public Task StartProcessAsync()
7982
AssignUserExecutePermissionsIfNotExists();
8083
}
8184

85+
_processExit = new();
8286
try
8387
{
84-
Process.ErrorDataReceived += (sender, e) => OnErrorDataReceived(sender, e);
85-
Process.OutputDataReceived += (sender, e) => OnOutputDataReceived(sender, e);
86-
Process.Exited += (sender, e) => OnProcessExited(sender, e);
88+
Process.ErrorDataReceived += OnErrorDataReceived;
89+
Process.OutputDataReceived += OnOutputDataReceived;
90+
Process.Exited += OnProcessExited;
8791
Process.EnableRaisingEvents = true;
8892
string sanitizedArguments = Sanitizer.Sanitize(Process.StartInfo.Arguments);
8993

@@ -103,12 +107,25 @@ public Task StartProcessAsync()
103107
}
104108
catch (Exception ex)
105109
{
110+
_processExit.TrySetException(ex);
106111
_workerProcessLogger.LogError(ex, $"Failed to start Worker Channel. Process fileName: {Process.StartInfo.FileName}");
107112
return Task.FromException(ex);
108113
}
109114
}
110115
}
111116

117+
public Task<int> WaitForExitAsync(CancellationToken cancellationToken = default)
118+
{
119+
ObjectDisposedException.ThrowIf(Disposing, this);
120+
if (_processExit is { } tcs)
121+
{
122+
// We use a TaskCompletionSource (and not Process.WaitForExitAsync) so we can propagate our custom exceptions.
123+
return tcs.Task.WaitAsync(cancellationToken);
124+
}
125+
126+
throw new InvalidOperationException("Process has not been started yet.");
127+
}
128+
112129
private void OnErrorDataReceived(object sender, DataReceivedEventArgs e)
113130
{
114131
if (e.Data != null)
@@ -159,12 +176,15 @@ private void OnProcessExited(object sender, EventArgs e)
159176

160177
if (Disposing)
161178
{
162-
// No action needed
163179
return;
164180
}
165181

182+
int exit = 0;
166183
try
167184
{
185+
ThrowIfExitError();
186+
187+
exit = Process.ExitCode;
168188
if (Process.ExitCode == WorkerConstants.SuccessExitCode)
169189
{
170190
Process.WaitForExit();
@@ -174,27 +194,45 @@ private void OnProcessExited(object sender, EventArgs e)
174194
{
175195
HandleWorkerProcessRestart();
176196
}
177-
else
178-
{
179-
string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
180-
string sanitizedExceptionMessage = Sanitizer.Sanitize(exceptionMessage);
181-
var processExitEx = new WorkerProcessExitException($"{Process.StartInfo.FileName} exited with code {Process.ExitCode} (0x{Process.ExitCode.ToString("X")})", new Exception(sanitizedExceptionMessage));
182-
processExitEx.ExitCode = Process.ExitCode;
183-
processExitEx.Pid = Process.Id;
184-
HandleWorkerProcessExitError(processExitEx);
185-
}
197+
}
198+
catch (WorkerProcessExitException processExitEx)
199+
{
200+
_processExit.TrySetException(processExitEx);
201+
HandleWorkerProcessExitError(processExitEx);
186202
}
187203
catch (Exception exc)
188204
{
189-
_workerProcessLogger?.LogDebug(exc, "Exception on worker process exit. Process id: {processId}", Process?.Id);
190205
// ignore process is already disposed
206+
_processExit.TrySetException(exc);
207+
_workerProcessLogger?.LogDebug(exc, "Exception on worker process exit. Process id: {processId}", Process?.Id);
191208
}
192209
finally
193210
{
211+
_processExit.TrySetResult(exit);
194212
UnregisterFromProcessMonitor();
195213
}
196214
}
197215

216+
private void ThrowIfExitError()
217+
{
218+
if (Process.ExitCode is WorkerConstants.SuccessExitCode or WorkerConstants.IntentionalRestartExitCode)
219+
{
220+
return;
221+
}
222+
223+
string exceptionMessage = string.Join(",", _processStdErrDataQueue.Where(s => !string.IsNullOrEmpty(s)));
224+
string sanitizedExceptionMessage = Sanitizer.Sanitize(exceptionMessage);
225+
WorkerProcessExitException processExitEx = new(
226+
$"{Process.StartInfo.FileName} exited with code {Process.ExitCode} (0x{Process.ExitCode:X})",
227+
new Exception(sanitizedExceptionMessage))
228+
{
229+
ExitCode = Process.ExitCode,
230+
Pid = Process.Id
231+
};
232+
233+
throw processExitEx;
234+
}
235+
198236
private void OnOutputDataReceived(object sender, DataReceivedEventArgs e)
199237
{
200238
if (e.Data != null)
@@ -343,7 +381,7 @@ private void AssignUserExecutePermissionsIfNotExists()
343381
return;
344382
}
345383

346-
UnixFileInfo fileInfo = new UnixFileInfo(filePath);
384+
UnixFileInfo fileInfo = new(filePath);
347385
if (!fileInfo.FileAccessPermissions.HasFlag(FileAccessPermissions.UserExecute))
348386
{
349387
_workerProcessLogger.LogDebug("Assigning execute permissions to file: {filePath}", filePath);

src/WebJobs.Script.Grpc/Rpc/RpcWorkerProcess.cs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,23 @@ internal class RpcWorkerProcess : WorkerProcess
2626
private readonly IOptions<FunctionsHostingConfigOptions> _hostingConfigOptions;
2727
private readonly IEnvironment _environment;
2828

29-
internal RpcWorkerProcess(string runtime,
30-
string workerId,
31-
string rootScriptPath,
32-
Uri serverUri,
33-
RpcWorkerConfig rpcWorkerConfig,
34-
IScriptEventManager eventManager,
35-
IWorkerProcessFactory processFactory,
36-
IProcessRegistry processRegistry,
37-
ILogger workerProcessLogger,
38-
IWorkerConsoleLogSource consoleLogSource,
39-
IMetricsLogger metricsLogger,
40-
IServiceProvider serviceProvider,
41-
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions,
42-
IEnvironment environment,
43-
IOptionsMonitor<ScriptApplicationHostOptions> scriptApplicationHostOptions,
44-
ILoggerFactory loggerFactory)
29+
internal RpcWorkerProcess(
30+
string runtime,
31+
string workerId,
32+
string rootScriptPath,
33+
Uri serverUri,
34+
RpcWorkerConfig rpcWorkerConfig,
35+
IScriptEventManager eventManager,
36+
IWorkerProcessFactory processFactory,
37+
IProcessRegistry processRegistry,
38+
ILogger workerProcessLogger,
39+
IWorkerConsoleLogSource consoleLogSource,
40+
IMetricsLogger metricsLogger,
41+
IServiceProvider serviceProvider,
42+
IOptions<FunctionsHostingConfigOptions> hostingConfigOptions,
43+
IEnvironment environment,
44+
IOptionsMonitor<ScriptApplicationHostOptions> scriptApplicationHostOptions,
45+
ILoggerFactory loggerFactory)
4546
: base(eventManager, processRegistry, workerProcessLogger, consoleLogSource, metricsLogger, serviceProvider, loggerFactory, environment,
4647
scriptApplicationHostOptions, rpcWorkerConfig.Description.UseStdErrorStreamForErrorsOnly)
4748
{
@@ -74,23 +75,33 @@ internal override Process CreateWorkerProcess()
7475

7576
internal override void HandleWorkerProcessExitError(WorkerProcessExitException rpcWorkerProcessExitException)
7677
{
78+
ArgumentNullException.ThrowIfNull(rpcWorkerProcessExitException);
7779
if (Disposing)
7880
{
7981
return;
8082
}
81-
if (rpcWorkerProcessExitException == null)
82-
{
83-
throw new ArgumentNullException(nameof(rpcWorkerProcessExitException));
84-
}
83+
8584
// The subscriber of WorkerErrorEvent is expected to Dispose() the errored channel
86-
_workerProcessLogger.LogError(rpcWorkerProcessExitException, $"Language Worker Process exited. Pid={rpcWorkerProcessExitException.Pid}.", _workerProcessArguments.ExecutablePath);
87-
_eventManager.Publish(new WorkerErrorEvent(_runtime, _workerId, rpcWorkerProcessExitException));
85+
_workerProcessLogger.LogError(rpcWorkerProcessExitException, $"Language Worker Process exited. Pid={rpcWorkerProcessExitException.Pid}.", _workerProcessArguments?.ExecutablePath);
86+
PublishNoThrow(new WorkerErrorEvent(_runtime, _workerId, rpcWorkerProcessExitException));
8887
}
8988

9089
internal override void HandleWorkerProcessRestart()
9190
{
9291
_workerProcessLogger?.LogInformation("Language Worker Process exited and needs to be restarted.");
93-
_eventManager.Publish(new WorkerRestartEvent(_runtime, _workerId));
92+
PublishNoThrow(new WorkerRestartEvent(_runtime, _workerId));
93+
}
94+
95+
private void PublishNoThrow(RpcChannelEvent @event)
96+
{
97+
try
98+
{
99+
_eventManager.Publish(@event);
100+
}
101+
catch (Exception ex)
102+
{
103+
_workerProcessLogger.LogWarning(ex, "Failed to publish RpcChannelEvent.");
104+
}
94105
}
95106
}
96-
}
107+
}

src/WebJobs.Script.Grpc/WorkerFunctionMetadataProvider.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
7777
// forceRefresh will be false when bundle is not used (dotnet and dotnet-isolated).
7878
if (!_environment.IsPlaceholderModeEnabled() && forceRefresh && !_scriptOptions.CurrentValue.IsFileSystemReadOnly)
7979
{
80-
_channelManager.ShutdownChannelsAsync().GetAwaiter().GetResult();
80+
await _channelManager.ShutdownChannelsAsync();
8181
}
8282

8383
var channels = _channelManager.GetChannels(_workerRuntime);
@@ -107,6 +107,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
107107
throw new InvalidOperationException($"No initialized language worker channel found for runtime: {_workerRuntime}.");
108108
}
109109

110+
List<Exception> errors = null;
110111
foreach (string workerId in channels.Keys.ToList())
111112
{
112113
if (channels.TryGetValue(workerId, out TaskCompletionSource<IRpcWorkerChannel> languageWorkerChannelTask))
@@ -129,7 +130,7 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
129130
}
130131

131132
_functions = functions.ToImmutableArray();
132-
_logger.FunctionsReturnedByProvider(_functions.IsDefault ? 0 : _functions.Count(), _metadataProviderName);
133+
_logger.FunctionsReturnedByProvider(_functions.Length, _metadataProviderName);
133134

134135
// Validate if the app has functions in legacy format and add in logs to inform about the mixed app
135136
_ = Task.Delay(TimeSpan.FromMinutes(1)).ContinueWith(t => ValidateFunctionAppFormat(_scriptOptions.CurrentValue.ScriptPath, _logger, _environment));
@@ -140,9 +141,13 @@ public async Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<R
140141
{
141142
_logger.LogWarning(ex, "Removing errored webhost language worker channel for runtime: {workerRuntime} workerId:{workerId}", _workerRuntime, workerId);
142143
await _channelManager.ShutdownChannelIfExistsAsync(_workerRuntime, workerId, ex);
144+
errors ??= [];
145+
errors.Add(ex);
143146
}
144147
}
145148
}
149+
150+
ExceptionExtensions.ThrowIfErrorsPresent(errors, "Errors getting function metadata from workers.");
146151
}
147152

148153
return new FunctionMetadataResult(useDefaultMetadataIndexing: false, _functions);

0 commit comments

Comments
 (0)