Skip to content

Commit

Permalink
Merge pull request #415 from TwitchPlaysPokemon/twitch_eventsub_subsr…
Browse files Browse the repository at this point in the history
…iptions

subscriptions through EventSub
  • Loading branch information
Felk authored Sep 1, 2024
2 parents d209c04 + 7e11069 commit 8da20cd
Show file tree
Hide file tree
Showing 21 changed files with 646 additions and 573 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/deploy.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
# Test that the new executable runs with the existing config.
# Redirect any output just to make sure no json parsing errors or similar can leak secrets.
# Then just replace the executable atomically using mv and restart the service.
if ./core_update testconfig >/dev/null 2>&1 ; then
testconfig_output=$(./core_update testconfig 2>&1)
testconfig_exitcode=$?
if [[ $testconfig_exitcode == 0 ]]; then
\cp core core_deploybackup && \
mv core_update core && \
systemctl --user restart tpp-dualcore && \
echo "Successfully deployed!"
exit 0
elif [[ $testconfig_exitcode == 42 ]]; then
# 42 = Arbitrary exit code to indicate a semantic error, see also Program.cs
echo "Failed to run 'testconfig' for new deployment, a semantic error occurred:"
echo testconfig_output
exit 1
else
echo "Failed to run 'testconfig' for new deployment."
echo "Failed to run 'testconfig' for new deployment, an uncaught exception occurred."
echo "The output is suppressed to avoid leaking sensitive data, but this typically means the config file has syntactic or semantic errors."
exit 1
fi
5 changes: 2 additions & 3 deletions TPP.Common/EnumExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ namespace TPP.Common
{
public static class EnumExtensions
{
public static string? GetEnumMemberValue<T>(this T value)
where T : struct, IConvertible
public static string? GetEnumMemberValue(this Enum value)
{
return typeof(T)
return value.GetType()
.GetTypeInfo()
.DeclaredMembers
.SingleOrDefault(x => x.Name == value.ToString())
Expand Down
2 changes: 1 addition & 1 deletion TPP.Core/Chat/ChatFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public IChat Create(ConnectionConfig config) =>
cfg, userRepo, coStreamChannelsRepo,
new SubscriptionProcessor(
loggerFactory.CreateLogger<SubscriptionProcessor>(),
tokenBank, userRepo, subscriptionLogRepo, linkedAccountRepo),
tokenBank, userRepo, subscriptionLogRepo, linkedAccountRepo, Duration.FromSeconds(10)),
overlayConnection),
ConnectionConfig.Simulation cfg => new SimulationChat(config.Name, loggerFactory, cfg, userRepo),
_ => throw new ArgumentOutOfRangeException(nameof(config), "unknown chat connector type")
Expand Down
136 changes: 11 additions & 125 deletions TPP.Core/Chat/TwitchChat.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -10,12 +9,6 @@
using TPP.Core.Overlay;
using TPP.Core.Utils;
using TPP.Persistence;
using TwitchLib.Api.Auth;
using TwitchLib.Client;
using TwitchLib.Client.Events;
using TwitchLib.Client.Models;
using TwitchLib.Communication.Clients;
using TwitchLib.Communication.Events;
using User = TPP.Model.User;

namespace TPP.Core.Chat
Expand All @@ -26,12 +19,10 @@ public sealed class TwitchChat : IChat, IChatModeChanger, IExecutor
public event EventHandler<MessageEventArgs>? IncomingMessage;

private readonly ILogger<TwitchChat> _logger;
private readonly IClock _clock;
public readonly string ChannelId;
private readonly IUserRepo _userRepo;
private readonly TwitchClient _twitchClient;
public readonly TwitchApi TwitchApi;
private readonly TwitchLibSubscriptionWatcher? _subscriptionWatcher;
private readonly string _channelName;
private readonly string _botUsername;
private readonly TwitchChatSender _twitchChatSender;
private readonly TwitchChatModeChanger _twitchChatModeChanger;
private readonly TwitchChatExecutor _twitchChatExecutor;
Expand All @@ -50,156 +41,51 @@ public TwitchChat(
{
Name = name;
_logger = loggerFactory.CreateLogger<TwitchChat>();
_clock = clock;
ChannelId = chatConfig.ChannelId;
_userRepo = userRepo;
_channelName = chatConfig.Channel;
_botUsername = chatConfig.Username;

TwitchApi = new TwitchApi(
loggerFactory,
clock,
chatConfig.InfiniteAccessToken,
chatConfig.RefreshToken,
chatConfig.ChannelInfiniteAccessToken,
chatConfig.ChannelRefreshToken,
chatConfig.AppClientId,
chatConfig.AppClientSecret);
TwitchEventSubChat = new TwitchEventSubChat(loggerFactory, clock, TwitchApi, _userRepo,
_twitchChatSender = new TwitchChatSender(loggerFactory, TwitchApi, chatConfig, useTwitchReplies);
TwitchEventSubChat = new TwitchEventSubChat(loggerFactory, clock, TwitchApi, userRepo,
subscriptionProcessor, overlayConnection, _twitchChatSender,
chatConfig.ChannelId, chatConfig.UserId,
chatConfig.CoStreamInputsEnabled, chatConfig.CoStreamInputsOnlyLive, coStreamChannelsRepo);
_twitchClient = new TwitchClient(
client: new WebSocketClient(),
loggerFactory: loggerFactory);
var credentials = new ConnectionCredentials(
twitchUsername: chatConfig.Username,
twitchOAuth: chatConfig.Password,
disableUsernameCheck: true);
// disable TwitchLib's command features, we do that ourselves
_twitchClient.ChatCommandIdentifiers.Add('\0');
_twitchClient.WhisperCommandIdentifiers.Add('\0');
_twitchClient.Initialize(
credentials: credentials,
channel: chatConfig.Channel);

_twitchClient.OnError += OnError;
_twitchClient.OnConnectionError += OnConnectionError;
TwitchEventSubChat.IncomingMessage += MessageReceived;
_twitchChatSender = new TwitchChatSender(loggerFactory, TwitchApi, chatConfig, useTwitchReplies);
_twitchChatModeChanger = new TwitchChatModeChanger(
loggerFactory.CreateLogger<TwitchChatModeChanger>(), TwitchApi, chatConfig);
_twitchChatExecutor = new TwitchChatExecutor(loggerFactory.CreateLogger<TwitchChatExecutor>(),
TwitchApi, chatConfig);

_subscriptionWatcher = chatConfig.MonitorSubscriptions
? new TwitchLibSubscriptionWatcher(loggerFactory, _userRepo, _twitchClient, clock,
subscriptionProcessor, _twitchChatSender, overlayConnection, chatConfig.Channel)
: null;
}

private void MessageReceived(object? sender, MessageEventArgs args)
{
IncomingMessage?.Invoke(this, args);
}

// Subscribe to TwitchClient errors to hopefully prevent the very rare incidents where the bot effectively
// gets disconnected, but the CheckConnectivityWorker cannot detect it and doesn't reconnect.
// I've never caught this event firing (it doesn't fire when you pull the ethernet cable either)
// but the theory is that if this bug occurs: https://github.com/dotnet/runtime/issues/48246 we can call
// Disconnect() to force the underlying ClientWebSocket.State to change to Abort.
private async Task OnError(object? sender, OnErrorEventArgs e)
{
_logger.LogError(e.Exception, "The TwitchClient encountered an error. Forcing a disconnect");
await _twitchClient.DisconnectAsync();
// let the CheckConnectivityWorker handle reconnecting
}

private async Task OnConnectionError(object? sender, OnConnectionErrorArgs e)
{
// same procedure as above
_logger.LogError("The TwitchClient encountered a connection error. Forcing a disconnect. Error: {Error}",
e.Error.Message);
await _twitchClient.DisconnectAsync();
}

/// Copied from TPP.Core's README.md
private static readonly Dictionary<string, string> ScopesAndWhatTheyAreNeededFor = new()
{
["chat:read"] = "Read messages from chat (via IRC/TMI)",
["chat:edit"] = "Send messages to chat (via IRC/TMI)",
["user:bot"] = "Appear in chat as bot",
["user:read:chat"] = "Read messages from chat. (via EventSub)",
["user:write:chat"] = "Send messages to chat. (via Twitch API)",
["user:manage:whispers"] = "Sending and receiving whispers",
["moderator:read:chatters"] = "Read the chatters list in the channel (e.g. for badge drops)",
["moderator:read:followers"] = "Read the followers list (currently old core)",
["moderator:manage:banned_users"] = "Timeout, ban and unban users (tpp automod, mod commands)",
["moderator:manage:chat_messages"] = "Delete chat messages (tpp automod, purge invalid bets)",
["moderator:manage:chat_settings"] = "Change chat settings, e.g. emote-only mode (mod commands)",
["channel:read:subscriptions"] = "Reacting to incoming subscriptions",
};

private void ValidateScopes(HashSet<string> presentScopes)
{
foreach ((string scope, string neededFor) in ScopesAndWhatTheyAreNeededFor)
if (!presentScopes.Contains(scope))
_logger.LogWarning("Missing Twitch-API scope '{Scope}', needed for: {NeededFor}", scope, neededFor);
}

public async Task Start(CancellationToken cancellationToken)
{
_logger.LogDebug("Validating API access token...");
ValidateAccessTokenResponse validateResult = await TwitchApi.Validate();
_logger.LogInformation(
"Successfully validated Twitch API access token! Client-ID: {ClientID}, User-ID: {UserID}, " +
"Login: {Login}, Expires in: {Expires}s, Scopes: {Scopes}", validateResult.ClientId,
validateResult.UserId, validateResult.Login, validateResult.ExpiresIn, validateResult.Scopes);
ValidateScopes(validateResult.Scopes.ToHashSet());

await _twitchClient.ConnectAsync();
_logger.LogInformation("Connected to Twitch, channels: {Channels}",
string.Join(", ", _twitchClient.JoinedChannels.Select(c => c.Channel)));
foreach (string problem in await TwitchApi.DetectProblems(_botUsername, _channelName))
_logger.LogWarning("TwitchAPI problem detected: {Problem}", problem);

List<Task> tasks = [];
tasks.Add(CheckConnectivityWorker(cancellationToken));
tasks.Add(TwitchEventSubChat.Start(cancellationToken));
await TaskUtils.WhenAllFastExit(tasks);

await _twitchClient.DisconnectAsync();
await _twitchChatSender.DisposeAsync();
_subscriptionWatcher?.Dispose();
TwitchEventSubChat.IncomingMessage -= MessageReceived;
_logger.LogDebug("twitch chat is now fully shut down");
}

/// TwitchClient's disconnect event appears to fire unreliably,
/// so it is safer to manually check the connection every few seconds.
private async Task CheckConnectivityWorker(CancellationToken cancellationToken)
{
TimeSpan minDelay = TimeSpan.FromSeconds(3);
TimeSpan maxDelay = TimeSpan.FromSeconds(30);
TimeSpan delay = minDelay;
while (!cancellationToken.IsCancellationRequested)
{
delay *= _twitchClient.IsConnected ? 0.5 : 2;
if (delay > maxDelay) delay = maxDelay;
if (delay < minDelay) delay = minDelay;

if (!_twitchClient.IsConnected)
{
_logger.LogError("Not connected to twitch, trying to reconnect...");
try
{
await _twitchClient.ReconnectAsync();
_logger.LogInformation("Successfully reconnected to twitch.");
}
catch (Exception)
{
_logger.LogError("Failed to reconnect, trying again in {Delay} seconds", delay.TotalSeconds);
}
}

try { await Task.Delay(delay, cancellationToken); }
catch (OperationCanceledException) { break; }
}
}

public Task EnableEmoteOnly() => _twitchChatModeChanger.EnableEmoteOnly();
public Task DisableEmoteOnly() => _twitchChatModeChanger.DisableEmoteOnly();

Expand Down
Loading

0 comments on commit 8da20cd

Please sign in to comment.