Skip to content

Commit

Permalink
Merge pull request #392 from TwitchPlaysPokemon/chat_connection_updates
Browse files Browse the repository at this point in the history
refactor chat connections and allow dynamically joining chats
  • Loading branch information
m4-used-rollout authored Feb 22, 2024
2 parents a4f6416 + f0e8205 commit f946a77
Show file tree
Hide file tree
Showing 51 changed files with 1,338 additions and 959 deletions.
69 changes: 29 additions & 40 deletions TPP.Core/AdvertisePollsWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NodaTime;
using TPP.Core.Chat;
using TPP.Core.Commands.Definitions;
Expand All @@ -13,67 +14,55 @@ namespace TPP.Core
/// <summary>
/// Advertises polls in chat on an interval.
/// </summary>
public sealed class AdvertisePollsWorker : IDisposable
public sealed class AdvertisePollsWorker : IWithLifecycle
{
private readonly ILogger<AdvertisePollsWorker> _logger;
private readonly Duration _interval;
private readonly IPollRepo _pollRepo;
private readonly IMessageSender _messageSender;

private readonly CancellationTokenSource _cancellationTokenSource;
private Task? _runTask = null;

public AdvertisePollsWorker(Duration interval, IPollRepo pollRepo, IMessageSender messageSender)
public AdvertisePollsWorker(ILogger<AdvertisePollsWorker> logger, Duration interval, IPollRepo pollRepo,
IMessageSender messageSender)
{
_logger = logger;
_interval = interval;
_pollRepo = pollRepo;
_messageSender = messageSender;
_cancellationTokenSource = new CancellationTokenSource();
}

public void Start()
{
if (_runTask != null) throw new InvalidOperationException("the worker is already running!");
_runTask = Run();
}

private async Task Stop()
public async Task Start(CancellationToken cancellationToken)
{
if (_runTask == null) throw new InvalidOperationException("the worker is not running!");
_cancellationTokenSource.Cancel();
try
try { await Task.Delay(_interval.ToTimeSpan(), cancellationToken); }
catch (OperationCanceledException) { return; }
while (!cancellationToken.IsCancellationRequested)
{
await _runTask;
}
catch (OperationCanceledException)
{
}
}

public async Task Run()
{
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
await Task.Delay(_interval.ToTimeSpan(), _cancellationTokenSource.Token);
IImmutableList<Poll> polls = await _pollRepo.FindPolls(onlyActive: true);
if (polls.Count == 0) continue;
if (polls.Count == 1)
try
{
await _messageSender.SendMessage(
"Please vote in the currently active poll: " +
PollCommands.FormatSinglePollAdvertisement(polls[0]));
await DoLoop();
}
else
catch (Exception ex)
{
await _messageSender.SendMessage(PollCommands.FormatPollsAdvertisement(polls));
_logger.LogError(ex, "Failed to advertise polls");
}
try { await Task.Delay(_interval.ToTimeSpan(), cancellationToken); }
catch (OperationCanceledException) { break; }
}
}

public void Dispose()
private async Task DoLoop()
{
if (_runTask != null) Stop().Wait();
_cancellationTokenSource.Dispose();
_runTask?.Dispose();
IImmutableList<Poll> polls = await _pollRepo.FindPolls(onlyActive: true);
if (polls.Count == 0) return;
if (polls.Count == 1)
{
await _messageSender.SendMessage(
"Please vote in the currently active poll: " +
PollCommands.FormatSinglePollAdvertisement(polls[0]));
}
else
{
await _messageSender.SendMessage(PollCommands.FormatPollsAdvertisement(polls));
}
}
}
}
64 changes: 23 additions & 41 deletions TPP.Core/Chat/ChatFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,29 @@
using TPP.Model;
using TPP.Persistence;

namespace TPP.Core.Chat
{
public class ChatFactory
{
private readonly ILoggerFactory _loggerFactory;
private readonly IClock _clock;
private readonly IUserRepo _userRepo;
private readonly IChattersSnapshotsRepo _chattersSnapshotsRepo;
private readonly IBank<User> _tokenBank;
private readonly ISubscriptionLogRepo _subscriptionLogRepo;
private readonly ILinkedAccountRepo _linkedAccountRepo;
private readonly OverlayConnection _overlayConnection;
namespace TPP.Core.Chat;

public ChatFactory(
ILoggerFactory loggerFactory, IClock clock, IUserRepo userRepo,
IChattersSnapshotsRepo chattersSnapshotsRepo, IBank<User> tokenBank,
ISubscriptionLogRepo subscriptionLogRepo, ILinkedAccountRepo linkedAccountRepo,
OverlayConnection overlayConnection)
public class ChatFactory(
ILoggerFactory loggerFactory,
IClock clock,
IUserRepo userRepo,
ICoStreamChannelsRepo coStreamChannelsRepo,
IBank<User> tokenBank,
ISubscriptionLogRepo subscriptionLogRepo,
ILinkedAccountRepo linkedAccountRepo,
OverlayConnection overlayConnection)
{
public IChat Create(ConnectionConfig config) =>
config switch
{
_loggerFactory = loggerFactory;
_clock = clock;
_userRepo = userRepo;
_chattersSnapshotsRepo = chattersSnapshotsRepo;
_tokenBank = tokenBank;
_subscriptionLogRepo = subscriptionLogRepo;
_linkedAccountRepo = linkedAccountRepo;
_overlayConnection = overlayConnection;
}

public IChat Create(ConnectionConfig config) =>
config switch
{
ConnectionConfig.Console cfg => new ConsoleChat(config.Name, _loggerFactory, cfg, _userRepo),
ConnectionConfig.Twitch cfg => new TwitchChat(config.Name, _loggerFactory, _clock, cfg, _userRepo,
_chattersSnapshotsRepo,
new SubscriptionProcessor(
_loggerFactory.CreateLogger<SubscriptionProcessor>(),
_tokenBank, _userRepo, _subscriptionLogRepo, _linkedAccountRepo),
_overlayConnection),
ConnectionConfig.Simulation cfg => new SimulationChat(config.Name, _loggerFactory, cfg, _userRepo),
_ => throw new ArgumentOutOfRangeException(nameof(config), "unknown chat connector type")
};
}
ConnectionConfig.Console cfg => new ConsoleChat(config.Name, loggerFactory, cfg, userRepo),
ConnectionConfig.Twitch cfg => new TwitchChat(config.Name, loggerFactory, clock,
cfg, userRepo, coStreamChannelsRepo,
new SubscriptionProcessor(
loggerFactory.CreateLogger<SubscriptionProcessor>(),
tokenBank, userRepo, subscriptionLogRepo, linkedAccountRepo),
overlayConnection, useTwitchReplies: false),
ConnectionConfig.Simulation cfg => new SimulationChat(config.Name, loggerFactory, cfg, userRepo),
_ => throw new ArgumentOutOfRangeException(nameof(config), "unknown chat connector type")
};
}
51 changes: 17 additions & 34 deletions TPP.Core/Chat/ConsoleChat.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NodaTime;
Expand Down Expand Up @@ -41,19 +42,23 @@ public Task SendWhisper(User target, string message)
return Task.CompletedTask;
}

public void Dispose()
{
Console.In.Close();
}

public string Name { get; }
public event EventHandler<MessageEventArgs>? IncomingMessage;

private async Task ReadInput()
private async Task ReadInput(CancellationToken cancellationToken)
{
string? line;
while ((line = await Console.In.ReadLineAsync()) != null)
while (!cancellationToken.IsCancellationRequested)
{
string line;
try
{
// Put Console.In.ReadLineAsync onto the thread pool, because Console.In's TextReader#ReadLineAsync
// surprisingly actually runs synchronous, and we do not want to block the async runtime.
string? maybeLine = await Task.Run(async () => await Console.In.ReadLineAsync(cancellationToken));
if (maybeLine == null) break;
line = maybeLine;
}
catch (OperationCanceledException) { break; }
string username = _config.Username;
if (line.StartsWith('#'))
{
Expand All @@ -63,11 +68,11 @@ private async Task ReadInput()
}
string simpleName = username.ToLower();

var source = MessageSource.Chat;
MessageSource source = new MessageSource.PrimaryChat();
if (line.StartsWith('>'))
{
line = line[1..];
source = MessageSource.Whisper;
source = new MessageSource.Whisper();
}

// re-use already existing users if they exist, otherwise it gets confusing when you want to impersonate
Expand All @@ -88,36 +93,14 @@ private async Task ReadInput()
}
}

public Task Connect()
public async Task Start(CancellationToken cancellationToken)
{
Task.Run(ReadInput).ContinueWith(task =>
{
if (task.IsFaulted)
_logger.LogError(task.Exception, "console read task failed");
else
_logger.LogInformation("console read task finished");
});

Console.Out.WriteLine($"Chatting via console is now enabled. You are known as '{_config.Username}'.");
Console.Out.WriteLine(
"Prefixing a message with '#username ' will post as a different user, e.g. '#someone !help'");
Console.Out.WriteLine("Prefixing a message with '>' will make it a whisper, e.g. '>balance'");
Console.Out.WriteLine("You can combine both, e.g. '#someone >balance'");
return Task.CompletedTask;
}

private static Task PrintAction(string message)
{
Console.Out.WriteLine($"=== {message} ===");
return Task.CompletedTask;
await ReadInput(cancellationToken);
}

public Task EnableEmoteOnly() => PrintAction("enable emote only");
public Task DisableEmoteOnly() => PrintAction("disable emote only");
public Task DeleteMessage(string messageId) => PrintAction($"delete message with id {messageId}");
public Task Timeout(User user, string? message, Duration duration) =>
PrintAction($"time out {user.Name} for {duration}");
public Task Ban(User user, string? message) => PrintAction($"ban {user.Name}");
public Task Unban(User user, string? message) => PrintAction($"unban {user.Name}");
}
}
19 changes: 12 additions & 7 deletions TPP.Core/Chat/IChat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ public interface IChatModeChanger
public Task DisableEmoteOnly();
}

public interface IChat : IMessageSender, IChatModeChanger, IExecutor, IDisposable
public interface IMessageSource
{
string Name { get; }

event EventHandler<MessageEventArgs> IncomingMessage;
}

/// Establishes the connection.
/// All subsequent repeated invocations on this instance will fail.
/// The connection gets closed by disposing this instance.
Task Connect();
/// <summary>
/// Interface that describes a chat where messages can be received from and sent to,
/// in the context of a connection lifecycle.
/// Classes that implement this interface may also implement some of these to enable some additional features:
/// <see cref="IChatModeChanger"/> for mod commands to change the chat mode,
/// <see cref="IExecutor"/> for automated chat moderation
/// </summary>
public interface IChat : IMessageSender, IMessageSource, IWithLifecycle
{
string Name { get; }
}
}
72 changes: 24 additions & 48 deletions TPP.Core/Chat/SendOutQueuedMessagesWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,14 @@ namespace TPP.Core.Chat;
/// <summary>
/// Sends out messages that the old core queued in the database.
/// </summary>
public sealed class SendOutQueuedMessagesWorker : IDisposable
public sealed class SendOutQueuedMessagesWorker : IWithLifecycle
{
private readonly ILogger<SendOutQueuedMessagesWorker> _logger;
private readonly IIncomingMessagequeueRepo _incomingMessagequeueRepo;
private readonly IUserRepo _userRepo;
private readonly IMessageSender _messageSender;
private readonly IClock _clock;

private readonly CancellationTokenSource _cancellationTokenSource;
private Task? _runTask = null;

public SendOutQueuedMessagesWorker(
ILogger<SendOutQueuedMessagesWorker> logger,
IIncomingMessagequeueRepo incomingMessagequeueRepo,
Expand All @@ -34,60 +31,39 @@ public SendOutQueuedMessagesWorker(
_userRepo = userRepo;
_messageSender = messageSender;
_clock = clock;
_cancellationTokenSource = new CancellationTokenSource();
}

public void Start()
public async Task Start(CancellationToken cancellationToken)
{
if (_runTask != null) throw new InvalidOperationException("the worker is already running!");
_runTask = Run();
}

private async Task Stop()
{
if (_runTask == null) throw new InvalidOperationException("the worker is not running!");
_cancellationTokenSource.Cancel();
Instant olderThan = _clock.GetCurrentInstant() - Duration.FromMinutes(5);
await _incomingMessagequeueRepo.Prune(olderThan);
try
{
await _runTask;
}
catch (OperationCanceledException)
{
await _incomingMessagequeueRepo.ForEachAsync(ProcessOnce, cancellationToken);
}
catch (OperationCanceledException) { }
}

public async Task Run()
private async Task ProcessOnce(IncomingMessagequeueItem item)
{
Instant olderThan = _clock.GetCurrentInstant() - Duration.FromMinutes(5);
await _incomingMessagequeueRepo.Prune(olderThan);
await _incomingMessagequeueRepo.ForEachAsync(async item =>
_logger.LogDebug("Received message from queue to send out: {Message}", item);
if (item.MessageType == MessageType.Chat)
{
await _messageSender.SendMessage(item.Message);
}
else if (item.MessageType == MessageType.Whisper)
{
_logger.LogDebug("Received message from queue to send out: {Message}", item);
if (item.MessageType == MessageType.Chat)
{
await _messageSender.SendMessage(item.Message);
}
else if (item.MessageType == MessageType.Whisper)
{
User? user = await _userRepo.FindById(item.Target);
if (user == null)
_logger.LogError(
"Cannot send out queued whisper message because User-ID '{UserId}' is unknown: {Message}",
item.Target, item);
else
await _messageSender.SendWhisper(user, item.Message);
}
User? user = await _userRepo.FindById(item.Target);
if (user == null)
_logger.LogError(
"Cannot send out queued whisper message because User-ID '{UserId}' is unknown: {Message}",
item.Target, item);
else
{
throw new ArgumentException("Unknown message type {}", item.MessageType.ToString());
}
}, _cancellationTokenSource.Token);
}

public void Dispose()
{
if (_runTask != null) Stop().Wait();
_cancellationTokenSource.Dispose();
_runTask?.Dispose();
await _messageSender.SendWhisper(user, item.Message);
}
else
{
throw new ArgumentException("Unknown message type {}", item.MessageType.ToString());
}
}
}
Loading

0 comments on commit f946a77

Please sign in to comment.