Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve some task error handling, mostly InputServer #388

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 143 additions & 124 deletions TPP.Core/InputServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,152 +8,171 @@
using TPP.Inputting;
using InputMap = System.Collections.Generic.IDictionary<string, object>;

namespace TPP.Core
namespace TPP.Core;

public sealed class InputServer : IDisposable
{
public sealed class InputServer : IDisposable
{
private readonly ILogger<InputServer> _logger;
private readonly string _host;
private readonly int _port;
private readonly MuteInputsToken _muteInputsToken;
private readonly Func<IInputFeed> _inputFeedSupplier;
private readonly ILogger<InputServer> _logger;
private readonly string _host;
private readonly int _port;
private readonly MuteInputsToken _muteInputsToken;
private readonly Func<IInputFeed> _inputFeedSupplier;

private HttpListener? _httpListener;
private bool _stopped = false;
private HttpListener? _httpListener;

public InputServer(
ILogger<InputServer> logger,
string host, int port,
MuteInputsToken muteInputsToken,
Func<IInputFeed> inputFeedSupplier)
public InputServer(
ILogger<InputServer> logger,
string host, int port,
MuteInputsToken muteInputsToken,
Func<IInputFeed> inputFeedSupplier)
{
_logger = logger;
if (host is "0.0.0.0" or "::")
{
_logger = logger;
if (host is "0.0.0.0" or "::")
{
logger.LogWarning(
"Configured input server host as '{Host}', but the host is being used as a http listener prefix, " +
"not as a bind address. Assuming '*' instead to listen on all interfaces", host);
host = "*";
}
if (host is "localhost")
logger.LogWarning("Configured input server host as '{Host}' instead of '127.0.0.1. " +
"It might not be reachable from 127.0.0.1", host);
_host = host;
_port = port;
_muteInputsToken = muteInputsToken;
_inputFeedSupplier = inputFeedSupplier;
logger.LogWarning(
"Configured input server host as '{Host}', but the host is being used as a http listener prefix, " +
"not as a bind address. Assuming '*' instead to listen on all interfaces", host);
host = "*";
}
if (host is "localhost")
logger.LogWarning("Configured input server host as '{Host}' instead of '127.0.0.1. " +
"It might not be reachable from 127.0.0.1", host);
_host = host;
_port = port;
_muteInputsToken = muteInputsToken;
_inputFeedSupplier = inputFeedSupplier;
}

/// <summary>
/// Keeps responding to new incoming requests until the server is stopped with <see cref="Stop"/>.
/// </summary>
public async Task Listen()
/// <summary>
/// Keeps responding to new incoming requests until the server is stopped with <see cref="Stop"/>.
/// </summary>
public async Task Listen()
{
if (_httpListener != null)
throw new InvalidOperationException("Cannot listen: The internal http listener is already running!");
_httpListener = new HttpListener();

// Stop any lingering connections from filling the request queue
_httpListener.TimeoutManager.IdleConnection = TimeSpan.FromSeconds(1);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
if (_httpListener != null)
throw new InvalidOperationException("Cannot listen: The internal http listener is already running!");
_httpListener = new HttpListener();
_httpListener.TimeoutManager.RequestQueue = TimeSpan.FromMilliseconds(100);
_httpListener.TimeoutManager.HeaderWait = TimeSpan.FromMilliseconds(100);
_httpListener.TimeoutManager.EntityBody = TimeSpan.FromMilliseconds(100);
}

// Stop any lingering connections from filling the request queue
_httpListener.TimeoutManager.IdleConnection = TimeSpan.FromSeconds(1);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
_httpListener.TimeoutManager.RequestQueue = TimeSpan.FromMilliseconds(100);
_httpListener.TimeoutManager.HeaderWait = TimeSpan.FromMilliseconds(100);
_httpListener.TimeoutManager.EntityBody = TimeSpan.FromMilliseconds(100);
}
_httpListener.Prefixes.Add($"http://{_host}:{_port}/");
_httpListener.Start();
_logger.LogInformation("Started input server on {Prefixes}", _httpListener.Prefixes);

_httpListener.Prefixes.Add($"http://{_host}:{_port}/");
_httpListener.Start();
_logger.LogInformation("Started input server on {Prefixes}", _httpListener.Prefixes);
// put responding to input requests on its own thread to avoid any potential
// delays due to cooperative multitasking
await Task.Run(async () =>
{

// put responding to input requests on its own thread to avoid any potential
// delays due to cooperative multitasking
await Task.Run(async () =>
while (_httpListener.IsListening)
{

while (_httpListener.IsListening)
HttpListenerContext context;
try
{
context = await _httpListener.GetContextAsync();
}
catch (HttpListenerException)
{
_logger.LogDebug("input server listener was stopped");
return;
}
catch (ObjectDisposedException)
{
HttpListenerContext context;
try
{
context = await _httpListener.GetContextAsync();
}
catch (HttpListenerException)
{
_logger.LogDebug("input server listener was stopped");
return;
}
catch (ObjectDisposedException)
{
// GetContextAsync doesn't take a cancellation token,
// and stopping the http server can cause it to trip over itself for some reason.
_logger.LogDebug("Encountered ObjectDisposedException while accepting an incoming connection");
return;
}
// GetContextAsync doesn't take a cancellation token,
// and stopping the http server can cause it to trip over itself for some reason.
_logger.LogDebug("Encountered ObjectDisposedException while accepting an incoming connection");
return;
}

HttpListenerRequest request = context.Request;
HttpListenerResponse response = context.Response;
try
{
const long timeoutMs = 1000;
Task handleConnection = HandleSingleConnection(context.Request, context.Response);
Task timeout = Task.Delay(TimeSpan.FromMilliseconds(timeoutMs));
if (await Task.WhenAny(handleConnection, timeout) == timeout)
throw new InvalidOperationException($"Request took too long, timeout was {timeoutMs}ms");
}
catch (InvalidOperationException ex)
{
_logger.LogWarning(ex, "Encountered an error handling an incoming request, dropping connection");
context.Response.Close();
}
}
});

string? responseText;
string? requestUrl = request.RawUrl?.ToLower();
try
{
if (requestUrl == "/start_run")
{
_muteInputsToken.Muted = false;
responseText = "ok";
}
else if (requestUrl == "/stop_run")
{
_muteInputsToken.Muted = true;
responseText = "ok";
}
else
{
InputMap? inputMap = await _inputFeedSupplier().HandleRequest(requestUrl);
responseText = inputMap == null ? null : JsonSerializer.Serialize(inputMap);
}
}
catch (ArgumentException ex)
{
byte[] buffer = Encoding.UTF8.GetBytes(ex.Message);
try
{
response.ContentLength64 = buffer.Length;
await response.OutputStream.WriteAsync(buffer.AsMemory(0, buffer.Length));
response.StatusCode = 400;
response.Close();
}
catch (HttpListenerException httpEx)
{
_logger.LogError(httpEx,
"Failed to send input listener exception as response: {Exception}", ex.ToString());
}
continue;
}
if (!_stopped)
throw new InvalidOperationException(
"Unexpectedly encountered a graceful shutdown (listening ended but no stop was requested)");
}

if (responseText != null)
{
byte[] buffer = Encoding.UTF8.GetBytes(responseText);
response.ContentLength64 = buffer.Length;
await response.OutputStream.WriteAsync(buffer.AsMemory(0, buffer.Length));
}
response.Close();
}
});
private async Task HandleSingleConnection(HttpListenerRequest request, HttpListenerResponse response)
{
string? responseText;
string? requestUrl = request.RawUrl?.ToLower();
try
{
if (requestUrl == "/start_run")
{
_muteInputsToken.Muted = false;
responseText = "ok";
}
else if (requestUrl == "/stop_run")
{
_muteInputsToken.Muted = true;
responseText = "ok";
}
else
{
InputMap? inputMap = await _inputFeedSupplier().HandleRequest(requestUrl);
responseText = inputMap == null ? null : JsonSerializer.Serialize(inputMap);
}
}

public void Stop()
catch (ArgumentException ex)
{
if (_httpListener is { IsListening: true })
byte[] buffer = Encoding.UTF8.GetBytes(ex.Message);
try
{
response.ContentLength64 = buffer.Length;
await response.OutputStream.WriteAsync(buffer.AsMemory(0, buffer.Length));
response.StatusCode = 400;
response.Close();
}
catch (HttpListenerException httpEx)
{
_httpListener.Stop();
_logger.LogError(httpEx,
"Failed to send input listener exception as response: {Exception}", ex.ToString());
}
_httpListener = null;
return;
}

public void Dispose()
if (responseText != null)
{
Stop();
byte[] buffer = Encoding.UTF8.GetBytes(responseText);
response.ContentLength64 = buffer.Length;
await response.OutputStream.WriteAsync(buffer.AsMemory(0, buffer.Length));
}
response.Close();
}

public void Stop()
{
_stopped = true;
if (_httpListener is { IsListening: true })
{
_httpListener.Stop();
}
_httpListener = null;
}

public void Dispose()
{
Stop();
}
}
81 changes: 41 additions & 40 deletions TPP.Core/Modes/DualcoreMode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,55 @@
using TPP.Core.Configuration;
using TPP.Core.Overlay;

namespace TPP.Core.Modes
namespace TPP.Core.Modes;

public sealed class DualcoreMode : IMode, IDisposable
{
public sealed class DualcoreMode : IMode, IDisposable
{
private readonly ILogger<DualcoreMode> _logger;
private readonly StopToken _stopToken;
private readonly ModeBase _modeBase;
private readonly WebsocketBroadcastServer _broadcastServer;
private readonly DatabaseLock _databaseLock;
private readonly ILogger<DualcoreMode> _logger;
private readonly StopToken _stopToken;
private readonly ModeBase _modeBase;
private readonly WebsocketBroadcastServer _broadcastServer;
private readonly DatabaseLock _databaseLock;

public DualcoreMode(ILoggerFactory loggerFactory, BaseConfig baseConfig)
{
_logger = loggerFactory.CreateLogger<DualcoreMode>();
_stopToken = new StopToken();
Setups.Databases repos = Setups.SetUpRepositories(loggerFactory, _logger, baseConfig);
OverlayConnection overlayConnection;
(_broadcastServer, overlayConnection) = Setups.SetUpOverlayServer(loggerFactory,
baseConfig.OverlayWebsocketHost, baseConfig.OverlayWebsocketPort);
_modeBase = new ModeBase(loggerFactory, repos, baseConfig, _stopToken, null, overlayConnection);
_databaseLock = new DatabaseLock(
loggerFactory.CreateLogger<DatabaseLock>(), SystemClock.Instance, repos.KeyValueStore);
}
public DualcoreMode(ILoggerFactory loggerFactory, BaseConfig baseConfig)
{
_logger = loggerFactory.CreateLogger<DualcoreMode>();
_stopToken = new StopToken();
Setups.Databases repos = Setups.SetUpRepositories(loggerFactory, _logger, baseConfig);
OverlayConnection overlayConnection;
(_broadcastServer, overlayConnection) = Setups.SetUpOverlayServer(loggerFactory,
baseConfig.OverlayWebsocketHost, baseConfig.OverlayWebsocketPort);
_modeBase = new ModeBase(loggerFactory, repos, baseConfig, _stopToken, null, overlayConnection);
_databaseLock = new DatabaseLock(
loggerFactory.CreateLogger<DatabaseLock>(), SystemClock.Instance, repos.KeyValueStore);
}

public async Task Run()
public async Task Run()
{
await using IAsyncDisposable dbLock = await _databaseLock.Acquire();
_logger.LogInformation("Dualcore mode starting");
_modeBase.Start();
Task overlayWebsocketTask = _broadcastServer.Listen();
Task handleStopTask = Task.Run(async () =>
{
await using IAsyncDisposable dbLock = await _databaseLock.Acquire();
_logger.LogInformation("Dualcore mode starting");
_modeBase.Start();
Task overlayWebsocketTask = _broadcastServer.Listen();
while (!_stopToken.ShouldStop)
{
// there is no sequence, just busyloop
// Just wait until it is time to shut everything down
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
await _broadcastServer.Stop();
await overlayWebsocketTask;
_logger.LogInformation("Dualcore mode ended");
}
});
// Must wait on all concurrently running tasks simultaneously to know when one of them crashed
await Task.WhenAll(handleStopTask, overlayWebsocketTask);
_logger.LogInformation("Dualcore mode ended");
}

public void Cancel()
{
// there main loop is basically busylooping, so we can just tell it to stop
_stopToken.ShouldStop = true;
}
public void Cancel()
{
// there main loop is basically busylooping, so we can just tell it to stop
_stopToken.ShouldStop = true;
}

public void Dispose()
{
_modeBase.Dispose();
}
public void Dispose()
{
_modeBase.Dispose();
}
}
Loading
Loading