Skip to content

Commit

Permalink
subscriptions through EventSub WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Felk committed Jun 29, 2024
1 parent e78ab54 commit 022484b
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 402 deletions.
97 changes: 4 additions & 93 deletions TPP.Core/Chat/TwitchChat.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -10,11 +9,6 @@
using TPP.Core.Overlay;
using TPP.Core.Utils;
using TPP.Persistence;
using TwitchLib.Client;
using TwitchLib.Client.Events;
using TwitchLib.Client.Models;
using TwitchLib.Communication.Clients;
using TwitchLib.Communication.Events;
using User = TPP.Model.User;

namespace TPP.Core.Chat
Expand All @@ -25,12 +19,8 @@ public sealed class TwitchChat : IChat, IChatModeChanger, IExecutor
public event EventHandler<MessageEventArgs>? IncomingMessage;

private readonly ILogger<TwitchChat> _logger;
private readonly IClock _clock;
public readonly string ChannelId;
private readonly IUserRepo _userRepo;
private readonly TwitchClient _twitchClient;
public readonly TwitchApi TwitchApi;
private readonly TwitchLibSubscriptionWatcher? _subscriptionWatcher;
private readonly TwitchChatSender _twitchChatSender;
private readonly TwitchChatModeChanger _twitchChatModeChanger;
private readonly TwitchChatExecutor _twitchChatExecutor;
Expand All @@ -49,9 +39,7 @@ public TwitchChat(
{
Name = name;
_logger = loggerFactory.CreateLogger<TwitchChat>();
_clock = clock;
ChannelId = chatConfig.ChannelId;
_userRepo = userRepo;

TwitchApi = new TwitchApi(
loggerFactory,
Expand All @@ -60,113 +48,36 @@ public TwitchChat(
chatConfig.RefreshToken,
chatConfig.AppClientId,
chatConfig.AppClientSecret);
TwitchEventSubChat = new TwitchEventSubChat(loggerFactory, clock, TwitchApi, _userRepo,
_twitchChatSender = new TwitchChatSender(loggerFactory, TwitchApi, chatConfig, useTwitchReplies);

TwitchEventSubChat = new TwitchEventSubChat(loggerFactory, clock, TwitchApi, userRepo,
subscriptionProcessor, overlayConnection, _twitchChatSender,
chatConfig.ChannelId, chatConfig.UserId,
chatConfig.CoStreamInputsEnabled, chatConfig.CoStreamInputsOnlyLive, coStreamChannelsRepo);
_twitchClient = new TwitchClient(
client: new WebSocketClient(),
loggerFactory: loggerFactory);
var credentials = new ConnectionCredentials(
twitchUsername: chatConfig.Username,
twitchOAuth: chatConfig.Password,
disableUsernameCheck: true);
// disable TwitchLib's command features, we do that ourselves
_twitchClient.ChatCommandIdentifiers.Add('\0');
_twitchClient.WhisperCommandIdentifiers.Add('\0');
_twitchClient.Initialize(
credentials: credentials,
channel: chatConfig.Channel);

_twitchClient.OnError += OnError;
_twitchClient.OnConnectionError += OnConnectionError;
TwitchEventSubChat.IncomingMessage += MessageReceived;
_twitchChatSender = new TwitchChatSender(loggerFactory, TwitchApi, chatConfig, useTwitchReplies);
_twitchChatModeChanger = new TwitchChatModeChanger(
loggerFactory.CreateLogger<TwitchChatModeChanger>(), TwitchApi, chatConfig);
_twitchChatExecutor = new TwitchChatExecutor(loggerFactory.CreateLogger<TwitchChatExecutor>(),
TwitchApi, chatConfig);

_subscriptionWatcher = chatConfig.MonitorSubscriptions
? new TwitchLibSubscriptionWatcher(loggerFactory, _userRepo, _twitchClient, clock,
subscriptionProcessor, _twitchChatSender, overlayConnection, chatConfig.Channel)
: null;
}

private void MessageReceived(object? sender, MessageEventArgs args)
{
IncomingMessage?.Invoke(this, args);
}

// Subscribe to TwitchClient errors to hopefully prevent the very rare incidents where the bot effectively
// gets disconnected, but the CheckConnectivityWorker cannot detect it and doesn't reconnect.
// I've never caught this event firing (it doesn't fire when you pull the ethernet cable either)
// but the theory is that if this bug occurs: https://github.com/dotnet/runtime/issues/48246 we can call
// Disconnect() to force the underlying ClientWebSocket.State to change to Abort.
private async Task OnError(object? sender, OnErrorEventArgs e)
{
_logger.LogError(e.Exception, "The TwitchClient encountered an error. Forcing a disconnect");
await _twitchClient.DisconnectAsync();
// let the CheckConnectivityWorker handle reconnecting
}

private async Task OnConnectionError(object? sender, OnConnectionErrorArgs e)
{
// same procedure as above
_logger.LogError("The TwitchClient encountered a connection error. Forcing a disconnect. Error: {Error}",
e.Error.Message);
await _twitchClient.DisconnectAsync();
}

public async Task Start(CancellationToken cancellationToken)
{
await _twitchClient.ConnectAsync();
_logger.LogInformation("Connected to Twitch, channels: {Channels}",
string.Join(", ", _twitchClient.JoinedChannels.Select(c => c.Channel)));

List<Task> tasks = [];
tasks.Add(CheckConnectivityWorker(cancellationToken));
tasks.Add(TwitchEventSubChat.Start(cancellationToken));
await TaskUtils.WhenAllFastExit(tasks);

await _twitchClient.DisconnectAsync();
await _twitchChatSender.DisposeAsync();
_subscriptionWatcher?.Dispose();
TwitchEventSubChat.IncomingMessage -= MessageReceived;
_logger.LogDebug("twitch chat is now fully shut down");
}

/// TwitchClient's disconnect event appears to fire unreliably,
/// so it is safer to manually check the connection every few seconds.
private async Task CheckConnectivityWorker(CancellationToken cancellationToken)
{
TimeSpan minDelay = TimeSpan.FromSeconds(3);
TimeSpan maxDelay = TimeSpan.FromSeconds(30);
TimeSpan delay = minDelay;
while (!cancellationToken.IsCancellationRequested)
{
delay *= _twitchClient.IsConnected ? 0.5 : 2;
if (delay > maxDelay) delay = maxDelay;
if (delay < minDelay) delay = minDelay;

if (!_twitchClient.IsConnected)
{
_logger.LogError("Not connected to twitch, trying to reconnect...");
try
{
await _twitchClient.ReconnectAsync();
_logger.LogInformation("Successfully reconnected to twitch.");
}
catch (Exception)
{
_logger.LogError("Failed to reconnect, trying again in {Delay} seconds", delay.TotalSeconds);
}
}

try { await Task.Delay(delay, cancellationToken); }
catch (OperationCanceledException) { break; }
}
}

public Task EnableEmoteOnly() => _twitchChatModeChanger.EnableEmoteOnly();
public Task DisableEmoteOnly() => _twitchChatModeChanger.DisableEmoteOnly();

Expand Down
Loading

0 comments on commit 022484b

Please sign in to comment.