Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscriptions through EventSub #415

Merged
merged 14 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .github/workflows/deploy.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
# Test that the new executable runs with the existing config.
# Redirect any output just to make sure no json parsing errors or similar can leak secrets.
# Then just replace the executable atomically using mv and restart the service.
if ./core_update testconfig >/dev/null 2>&1 ; then
testconfig_output=$(./core_update testconfig 2>&1)
testconfig_exitcode=$?
if [[ $testconfig_exitcode == 0 ]]; then
\cp core core_deploybackup && \
mv core_update core && \
systemctl --user restart tpp-dualcore && \
echo "Successfully deployed!"
exit 0
elif [[ $testconfig_exitcode == 42 ]]; then
# 42 = Arbitrary exit code to indicate a semantic error, see also Program.cs
echo "Failed to run 'testconfig' for new deployment, a semantic error occurred:"
echo testconfig_output
exit 1
else
echo "Failed to run 'testconfig' for new deployment."
echo "Failed to run 'testconfig' for new deployment, an uncaught exception occurred."
echo "The output is suppressed to avoid leaking sensitive data, but this typically means the config file has syntactic or semantic errors."
exit 1
fi
5 changes: 2 additions & 3 deletions TPP.Common/EnumExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ namespace TPP.Common
{
public static class EnumExtensions
{
public static string? GetEnumMemberValue<T>(this T value)
where T : struct, IConvertible
public static string? GetEnumMemberValue(this Enum value)
{
return typeof(T)
return value.GetType()
.GetTypeInfo()
.DeclaredMembers
.SingleOrDefault(x => x.Name == value.ToString())
Expand Down
2 changes: 1 addition & 1 deletion TPP.Core/Chat/ChatFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public IChat Create(ConnectionConfig config) =>
cfg, userRepo, coStreamChannelsRepo,
new SubscriptionProcessor(
loggerFactory.CreateLogger<SubscriptionProcessor>(),
tokenBank, userRepo, subscriptionLogRepo, linkedAccountRepo),
tokenBank, userRepo, subscriptionLogRepo, linkedAccountRepo, Duration.FromSeconds(10)),
overlayConnection),
ConnectionConfig.Simulation cfg => new SimulationChat(config.Name, loggerFactory, cfg, userRepo),
_ => throw new ArgumentOutOfRangeException(nameof(config), "unknown chat connector type")
Expand Down
136 changes: 11 additions & 125 deletions TPP.Core/Chat/TwitchChat.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -10,12 +9,6 @@
using TPP.Core.Overlay;
using TPP.Core.Utils;
using TPP.Persistence;
using TwitchLib.Api.Auth;
using TwitchLib.Client;
using TwitchLib.Client.Events;
using TwitchLib.Client.Models;
using TwitchLib.Communication.Clients;
using TwitchLib.Communication.Events;
using User = TPP.Model.User;

namespace TPP.Core.Chat
Expand All @@ -26,12 +19,10 @@ public sealed class TwitchChat : IChat, IChatModeChanger, IExecutor
public event EventHandler<MessageEventArgs>? IncomingMessage;

private readonly ILogger<TwitchChat> _logger;
private readonly IClock _clock;
public readonly string ChannelId;
private readonly IUserRepo _userRepo;
private readonly TwitchClient _twitchClient;
public readonly TwitchApi TwitchApi;
private readonly TwitchLibSubscriptionWatcher? _subscriptionWatcher;
private readonly string _channelName;
private readonly string _botUsername;
private readonly TwitchChatSender _twitchChatSender;
private readonly TwitchChatModeChanger _twitchChatModeChanger;
private readonly TwitchChatExecutor _twitchChatExecutor;
Expand All @@ -50,156 +41,51 @@ public TwitchChat(
{
Name = name;
_logger = loggerFactory.CreateLogger<TwitchChat>();
_clock = clock;
ChannelId = chatConfig.ChannelId;
_userRepo = userRepo;
_channelName = chatConfig.Channel;
_botUsername = chatConfig.Username;

TwitchApi = new TwitchApi(
loggerFactory,
clock,
chatConfig.InfiniteAccessToken,
chatConfig.RefreshToken,
chatConfig.ChannelInfiniteAccessToken,
chatConfig.ChannelRefreshToken,
chatConfig.AppClientId,
chatConfig.AppClientSecret);
TwitchEventSubChat = new TwitchEventSubChat(loggerFactory, clock, TwitchApi, _userRepo,
_twitchChatSender = new TwitchChatSender(loggerFactory, TwitchApi, chatConfig, useTwitchReplies);
TwitchEventSubChat = new TwitchEventSubChat(loggerFactory, clock, TwitchApi, userRepo,
subscriptionProcessor, overlayConnection, _twitchChatSender,
chatConfig.ChannelId, chatConfig.UserId,
chatConfig.CoStreamInputsEnabled, chatConfig.CoStreamInputsOnlyLive, coStreamChannelsRepo);
_twitchClient = new TwitchClient(
client: new WebSocketClient(),
loggerFactory: loggerFactory);
var credentials = new ConnectionCredentials(
twitchUsername: chatConfig.Username,
twitchOAuth: chatConfig.Password,
disableUsernameCheck: true);
// disable TwitchLib's command features, we do that ourselves
_twitchClient.ChatCommandIdentifiers.Add('\0');
_twitchClient.WhisperCommandIdentifiers.Add('\0');
_twitchClient.Initialize(
credentials: credentials,
channel: chatConfig.Channel);

_twitchClient.OnError += OnError;
_twitchClient.OnConnectionError += OnConnectionError;
TwitchEventSubChat.IncomingMessage += MessageReceived;
_twitchChatSender = new TwitchChatSender(loggerFactory, TwitchApi, chatConfig, useTwitchReplies);
_twitchChatModeChanger = new TwitchChatModeChanger(
loggerFactory.CreateLogger<TwitchChatModeChanger>(), TwitchApi, chatConfig);
_twitchChatExecutor = new TwitchChatExecutor(loggerFactory.CreateLogger<TwitchChatExecutor>(),
TwitchApi, chatConfig);

_subscriptionWatcher = chatConfig.MonitorSubscriptions
? new TwitchLibSubscriptionWatcher(loggerFactory, _userRepo, _twitchClient, clock,
subscriptionProcessor, _twitchChatSender, overlayConnection, chatConfig.Channel)
: null;
}

private void MessageReceived(object? sender, MessageEventArgs args)
{
IncomingMessage?.Invoke(this, args);
}

// Subscribe to TwitchClient errors to hopefully prevent the very rare incidents where the bot effectively
// gets disconnected, but the CheckConnectivityWorker cannot detect it and doesn't reconnect.
// I've never caught this event firing (it doesn't fire when you pull the ethernet cable either)
// but the theory is that if this bug occurs: https://github.com/dotnet/runtime/issues/48246 we can call
// Disconnect() to force the underlying ClientWebSocket.State to change to Abort.
private async Task OnError(object? sender, OnErrorEventArgs e)
{
_logger.LogError(e.Exception, "The TwitchClient encountered an error. Forcing a disconnect");
await _twitchClient.DisconnectAsync();
// let the CheckConnectivityWorker handle reconnecting
}

private async Task OnConnectionError(object? sender, OnConnectionErrorArgs e)
{
// same procedure as above
_logger.LogError("The TwitchClient encountered a connection error. Forcing a disconnect. Error: {Error}",
e.Error.Message);
await _twitchClient.DisconnectAsync();
}

/// Copied from TPP.Core's README.md
private static readonly Dictionary<string, string> ScopesAndWhatTheyAreNeededFor = new()
{
["chat:read"] = "Read messages from chat (via IRC/TMI)",
["chat:edit"] = "Send messages to chat (via IRC/TMI)",
["user:bot"] = "Appear in chat as bot",
["user:read:chat"] = "Read messages from chat. (via EventSub)",
["user:write:chat"] = "Send messages to chat. (via Twitch API)",
["user:manage:whispers"] = "Sending and receiving whispers",
["moderator:read:chatters"] = "Read the chatters list in the channel (e.g. for badge drops)",
["moderator:read:followers"] = "Read the followers list (currently old core)",
["moderator:manage:banned_users"] = "Timeout, ban and unban users (tpp automod, mod commands)",
["moderator:manage:chat_messages"] = "Delete chat messages (tpp automod, purge invalid bets)",
["moderator:manage:chat_settings"] = "Change chat settings, e.g. emote-only mode (mod commands)",
["channel:read:subscriptions"] = "Reacting to incoming subscriptions",
};

private void ValidateScopes(HashSet<string> presentScopes)
{
foreach ((string scope, string neededFor) in ScopesAndWhatTheyAreNeededFor)
if (!presentScopes.Contains(scope))
_logger.LogWarning("Missing Twitch-API scope '{Scope}', needed for: {NeededFor}", scope, neededFor);
}

public async Task Start(CancellationToken cancellationToken)
{
_logger.LogDebug("Validating API access token...");
ValidateAccessTokenResponse validateResult = await TwitchApi.Validate();
_logger.LogInformation(
"Successfully validated Twitch API access token! Client-ID: {ClientID}, User-ID: {UserID}, " +
"Login: {Login}, Expires in: {Expires}s, Scopes: {Scopes}", validateResult.ClientId,
validateResult.UserId, validateResult.Login, validateResult.ExpiresIn, validateResult.Scopes);
ValidateScopes(validateResult.Scopes.ToHashSet());

await _twitchClient.ConnectAsync();
_logger.LogInformation("Connected to Twitch, channels: {Channels}",
string.Join(", ", _twitchClient.JoinedChannels.Select(c => c.Channel)));
foreach (string problem in await TwitchApi.DetectProblems(_botUsername, _channelName))
_logger.LogWarning("TwitchAPI problem detected: {Problem}", problem);

List<Task> tasks = [];
tasks.Add(CheckConnectivityWorker(cancellationToken));
tasks.Add(TwitchEventSubChat.Start(cancellationToken));
await TaskUtils.WhenAllFastExit(tasks);

await _twitchClient.DisconnectAsync();
await _twitchChatSender.DisposeAsync();
_subscriptionWatcher?.Dispose();
TwitchEventSubChat.IncomingMessage -= MessageReceived;
_logger.LogDebug("twitch chat is now fully shut down");
}

/// TwitchClient's disconnect event appears to fire unreliably,
/// so it is safer to manually check the connection every few seconds.
private async Task CheckConnectivityWorker(CancellationToken cancellationToken)
{
TimeSpan minDelay = TimeSpan.FromSeconds(3);
TimeSpan maxDelay = TimeSpan.FromSeconds(30);
TimeSpan delay = minDelay;
while (!cancellationToken.IsCancellationRequested)
{
delay *= _twitchClient.IsConnected ? 0.5 : 2;
if (delay > maxDelay) delay = maxDelay;
if (delay < minDelay) delay = minDelay;

if (!_twitchClient.IsConnected)
{
_logger.LogError("Not connected to twitch, trying to reconnect...");
try
{
await _twitchClient.ReconnectAsync();
_logger.LogInformation("Successfully reconnected to twitch.");
}
catch (Exception)
{
_logger.LogError("Failed to reconnect, trying again in {Delay} seconds", delay.TotalSeconds);
}
}

try { await Task.Delay(delay, cancellationToken); }
catch (OperationCanceledException) { break; }
}
}

public Task EnableEmoteOnly() => _twitchChatModeChanger.EnableEmoteOnly();
public Task DisableEmoteOnly() => _twitchChatModeChanger.DisableEmoteOnly();

Expand Down
Loading
Loading