From 2c4dd5b757fa3aa7592458b4a68b8997f4b18fdc Mon Sep 17 00:00:00 2001 From: gottagofaster236 Date: Sun, 9 Feb 2025 23:32:42 +0100 Subject: [PATCH] Move to EventSub because PubSub is deprecated --- CMakeLists.txt | 4 +- src/EventsubListener.cpp | 246 +++++++++++++++++++++++++++++++++++ src/EventsubListener.h | 67 ++++++++++ src/PubsubListener.cpp | 169 ------------------------ src/PubsubListener.h | 47 ------- src/RewardsTheaterPlugin.cpp | 2 +- src/RewardsTheaterPlugin.h | 4 +- src/TwitchRewardsApi.cpp | 16 +-- src/TwitchRewardsApi.h | 2 +- 9 files changed, 327 insertions(+), 230 deletions(-) create mode 100644 src/EventsubListener.cpp create mode 100644 src/EventsubListener.h delete mode 100644 src/PubsubListener.cpp delete mode 100644 src/PubsubListener.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b2d86ce..5fb751b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,8 +54,8 @@ target_sources( src/ErrorMessageBox.cpp src/ConfirmDeleteReward.h src/ConfirmDeleteReward.cpp - src/PubsubListener.h - src/PubsubListener.cpp + src/EventsubListener.h + src/EventsubListener.cpp src/RewardRedemptionWidget.h src/RewardRedemptionWidget.cpp src/RewardRedemptionQueueDialog.h diff --git a/src/EventsubListener.cpp b/src/EventsubListener.cpp new file mode 100644 index 0000000..9ce9459 --- /dev/null +++ b/src/EventsubListener.cpp @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: GPL-3.0-only +// Copyright (c) 2023, Lev Leontev + +#include "EventsubListener.h" + +#include + +#include "Log.h" +#include "TwitchRewardsApi.h" + +namespace beast = boost::beast; +namespace http = beast::http; +namespace asio = boost::asio; +namespace ssl = asio::ssl; +namespace json = boost::json; +using tcp = asio::ip::tcp; + +using namespace boost::asio::experimental::awaitable_operators; +using namespace std::chrono_literals; + +static constexpr auto INITIAL_KEEPALIVE_TIMEOUT = 30s; +static constexpr auto RECONNECT_DELAY = 10s; +static const char* const CHANNEL_POINTS_SUBSCRIPTION_TYPE = "channel.channel_points_custom_reward_redemption.add"; + +EventsubListener::EventsubListener( + TwitchAuth& twitchAuth, + HttpClient& httpClient, + RewardRedemptionQueue& rewardRedemptionQueue +) + : twitchAuth(twitchAuth), httpClient(httpClient), rewardRedemptionQueue(rewardRedemptionQueue), eventsubThread(1), + eventsubUrl("wss://eventsub.wss.twitch.tv/ws"), processedMessageIds{}, sessionId{}, + keepaliveTimeoutTimer(eventsubThread.ioContext), keepaliveTimeout(INITIAL_KEEPALIVE_TIMEOUT), + usernameCondVar(eventsubThread.ioContext, boost::posix_time::pos_infin) { + connect(&twitchAuth, &TwitchAuth::onUsernameChanged, this, &EventsubListener::reconnectAfterUsernameChange); + asio::co_spawn(eventsubThread.ioContext, asyncReconnectToEventsubForever(), asio::detached); +} + +EventsubListener::~EventsubListener() { + eventsubThread.stop(); +} + +void EventsubListener::reconnectAfterUsernameChange() { + asio::post(eventsubThread.ioContext, [this] { + usernameCondVar.cancel(); // Equivalent to notify_all() for a condition variable. + }); +} + +const char* EventsubListener::KeepaliveTimeoutException::what() const noexcept { + return "KeepaliveTimeoutException"; +} + +const char* EventsubListener::SubscribeToChannelPointsException::what() const noexcept { + return "SubscribeToChannelPointsException"; +} + +const char* EventsubListener::ReconnectException::what() const noexcept { + return "ReconnectException"; +} + +asio::awaitable EventsubListener::asyncReconnectToEventsubForever() { + while (true) { + std::optional usernameOptional = twitchAuth.getUsername(); + if (!usernameOptional.has_value()) { + try { + co_await usernameCondVar.async_wait(asio::use_awaitable); + } catch (const boost::system::system_error&) { + // Username updated. + } + continue; + } + std::string username = usernameOptional.value(); + boost::urls::url eventsubUrl = this->eventsubUrl; + + try { + co_await (asyncConnectToEventsub(username) && usernameCondVar.async_wait(asio::use_awaitable)); + } catch (const std::exception& e) { + log(LOG_ERROR, "Exception in asyncReconnectToEventsubForever: {}", e.what()); + } + + if (twitchAuth.getUsername() != username || eventsubUrl != this->eventsubUrl) { + // Disconnected because of a username or URL change - reconnect immediately. + continue; + } + co_await asio::steady_timer(eventsubThread.ioContext, RECONNECT_DELAY).async_wait(asio::use_awaitable); + } +} + +asio::awaitable EventsubListener::asyncConnectToEventsub(const std::string& username) { + log(LOG_INFO, "Connecting to EventSub URL {} for user {}", eventsubUrl.c_str(), username); + WebsocketStream ws = co_await asyncConnect(); + keepaliveTimeout = INITIAL_KEEPALIVE_TIMEOUT; + resetKeepaliveTimeoutTimer(); + co_await (asyncSubscribeAndReadMessages(ws) && asyncMonitorKeepaliveTimeout()); +} + +asio::awaitable EventsubListener::asyncConnect() { + ssl::context sslContext{ssl::context::tlsv12}; + sslContext.set_default_verify_paths(); + tcp::resolver resolver{eventsubThread.ioContext}; + WebsocketStream ws{eventsubThread.ioContext, sslContext}; + const auto resolveResults = co_await resolver.async_resolve(eventsubUrl.host(), "https", asio::use_awaitable); + + co_await asio::async_connect(get_lowest_layer(ws), resolveResults, asio::use_awaitable); + if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), eventsubUrl.host().c_str())) { + throw boost::system::system_error( + boost::system::error_code(static_cast(::ERR_get_error()), asio::error::get_ssl_category()), + "Failed to set SNI Hostname" + ); + } + co_await ws.next_layer().async_handshake(ssl::stream_base::client, asio::use_awaitable); + co_await ws.async_handshake(eventsubUrl.host(), eventsubUrl.encoded_target(), asio::use_awaitable); + co_return ws; +} + +boost::asio::awaitable EventsubListener::asyncSubscribeAndReadMessages(WebsocketStream& ws) { + co_await asyncWaitForWelcomeMessage(ws); + co_await asyncSubscribeToChannelPoints(ws); + co_await asyncReadMessages(ws); +} + +boost::asio::awaitable EventsubListener::asyncMonitorKeepaliveTimeout() { + while (true) { + auto expiry = keepaliveTimeoutTimer.expiry(); + try { + co_await keepaliveTimeoutTimer.async_wait(asio::use_awaitable); + } catch (const boost::system::system_error&) { + if (keepaliveTimeoutTimer.expiry() != expiry) { + // Timeout updated. + continue; + } else { + throw; + } + } + log(LOG_ERROR, "Keepalive timeout expired"); + throw KeepaliveTimeoutException(); + } +} + +void EventsubListener::resetKeepaliveTimeoutTimer() { + auto newExpiry = std::chrono::steady_clock::now() + keepaliveTimeout; + // expires_at cancels any pending waits. Make sure we don't call it without a reason. + if (newExpiry != keepaliveTimeoutTimer.expiry()) { + keepaliveTimeoutTimer.expires_at(newExpiry); + } +} + +asio::awaitable EventsubListener::asyncWaitForWelcomeMessage(WebsocketStream& ws) { + json::value message; + do { + message = co_await asyncReadMessage(ws); + } while (getMessageType(message) != "session_welcome"); + + log(LOG_INFO, "Successfully connected to EventSub"); + + const json::value& session = message.at("payload").at("session"); + sessionId = value_to(session.at("id")); + int keepaliveTimeoutSeconds = value_to(session.at("keepalive_timeout_seconds")); + // Twitch server sends a keepalive message every {keepaliveTimeoutSeconds}. + // Because it's not 100% precise, sometimes it's a bit less than keepaliveTimeoutSeconds, sometimes a bit more. + // Therefore just multiply it by two to be safe. + keepaliveTimeout = std::chrono::seconds(keepaliveTimeoutSeconds * 2); + resetKeepaliveTimeoutTimer(); +} + +asio::awaitable EventsubListener::asyncSubscribeToChannelPoints(WebsocketStream& ws) { + json::value requestBody{ + {"type", "channel.channel_points_custom_reward_redemption.add"}, + {"version", "1"}, + {"condition", + { + {"broadcaster_user_id", twitchAuth.getUserIdOrThrow()}, + }}, + {"transport", + { + {"method", "websocket"}, + {"session_id", sessionId}, + }} + }; + HttpClient::Response response = co_await httpClient.request( + "api.twitch.tv", "/helix/eventsub/subscriptions", twitchAuth, {}, http::verb::post, requestBody + ); + if (response.status != http::status::accepted) { + log(LOG_ERROR, "HTTP status {} in asyncSubscribeToChannelPoints", static_cast(response.status)); + throw SubscribeToChannelPointsException(); + } +} + +asio::awaitable EventsubListener::asyncReadMessages(WebsocketStream& ws) { + while (true) { + json::value message = co_await asyncReadMessage(ws); + std::string type = getMessageType(message); + + if (type == "notification") { + json::value payload = message.at("payload"); + std::string subscriptionType = value_to(payload.at("subscription").at("type")); + if (subscriptionType != CHANNEL_POINTS_SUBSCRIPTION_TYPE) { + continue; + } + json::value event = payload.at("event"); + Reward reward = TwitchRewardsApi::parseEventsubReward(event.at("reward")); + std::string redemptionId = value_to(event.at("id")); + rewardRedemptionQueue.queueRewardRedemption(RewardRedemption{reward, redemptionId}); + } else if (type == "session_reconnect") { + eventsubUrl = boost::url(value_to(message.at("payload").at("session").at("reconnect_url"))); + throw ReconnectException(); + } + } +} + +asio::awaitable EventsubListener::asyncReadMessage(WebsocketStream& ws) { + while (true) { + json::value message = co_await asyncReadMessageIgnoringDuplicates(ws); + std::string messageId; + try { + messageId = value_to(message.at("metadata").at("message_id")); + } catch (const boost::system::system_error&) { + log(LOG_ERROR, "Could not parse message_id"); + co_return message; + } + // insert returns a pair of + if (processedMessageIds.insert(messageId).second) { + co_return message; + } + // Received a duplicate messsage, skip it and read the next one. + } +} + +asio::awaitable EventsubListener::asyncReadMessageIgnoringDuplicates(WebsocketStream& ws) { + std::string message; + auto buffer = asio::dynamic_buffer(message); + co_await ws.async_read(buffer, asio::use_awaitable); + resetKeepaliveTimeoutTimer(); + if (message.empty()) { + co_return json::value{}; + } + co_return json::parse(message); +} + +std::string EventsubListener::getMessageType(boost::json::value message) { + return value_to(message.at("metadata").at("message_type")); +} + +asio::awaitable EventsubListener::asyncSendMessage(WebsocketStream& ws, const json::value& message) { + std::string messageSerialized = json::serialize(message); + co_await ws.async_write(asio::buffer(messageSerialized), asio::use_awaitable); +} diff --git a/src/EventsubListener.h b/src/EventsubListener.h new file mode 100644 index 0000000..dba9974 --- /dev/null +++ b/src/EventsubListener.h @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: GPL-3.0-only +// Copyright (c) 2023, Lev Leontev + +#pragma once +#include +#include +#include +#include +#include + +#include "BoostAsio.h" +#include "HttpClient.h" +#include "IoThreadPool.h" +#include "RewardRedemptionQueue.h" +#include "TwitchAuth.h" + +/// Listens to channel points redemptions. Read https://dev.twitch.tv/docs/eventsub/ for API documentation. +class EventsubListener : public QObject { + Q_OBJECT + +public: + EventsubListener(TwitchAuth& twitchAuth, HttpClient& httpClient, RewardRedemptionQueue& rewardRedemptionQueue); + ~EventsubListener(); + +private slots: + void reconnectAfterUsernameChange(); + +private: + using WebsocketStream = boost::beast::websocket::stream>; + + class KeepaliveTimeoutException : public std::exception { + const char* what() const noexcept override; + }; + + class SubscribeToChannelPointsException : public std::exception { + const char* what() const noexcept override; + }; + + class ReconnectException : public std::exception { + const char* what() const noexcept override; + }; + + boost::asio::awaitable asyncReconnectToEventsubForever(); + boost::asio::awaitable asyncConnectToEventsub(const std::string& username); + boost::asio::awaitable asyncConnect(); + boost::asio::awaitable asyncSubscribeAndReadMessages(WebsocketStream& ws); + boost::asio::awaitable asyncMonitorKeepaliveTimeout(); + void resetKeepaliveTimeoutTimer(); + boost::asio::awaitable asyncWaitForWelcomeMessage(WebsocketStream& ws); + boost::asio::awaitable asyncSubscribeToChannelPoints(WebsocketStream& ws); + boost::asio::awaitable asyncReadMessages(WebsocketStream& ws); + boost::asio::awaitable asyncReadMessage(WebsocketStream& ws); + boost::asio::awaitable asyncReadMessageIgnoringDuplicates(WebsocketStream& ws); + static std::string getMessageType(boost::json::value message); + static boost::asio::awaitable asyncSendMessage(WebsocketStream& ws, const boost::json::value& message); + + TwitchAuth& twitchAuth; + HttpClient& httpClient; + RewardRedemptionQueue& rewardRedemptionQueue; + IoThreadPool eventsubThread; + boost::urls::url eventsubUrl; + std::set processedMessageIds; + std::string sessionId; + boost::asio::steady_timer keepaliveTimeoutTimer; + std::chrono::seconds keepaliveTimeout; + boost::asio::deadline_timer usernameCondVar; +}; diff --git a/src/PubsubListener.cpp b/src/PubsubListener.cpp deleted file mode 100644 index c204fc1..0000000 --- a/src/PubsubListener.cpp +++ /dev/null @@ -1,169 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-only -// Copyright (c) 2023, Lev Leontev - -#include "PubsubListener.h" - -#include - -#include "Log.h" -#include "TwitchRewardsApi.h" - -namespace beast = boost::beast; -namespace http = beast::http; -namespace asio = boost::asio; -namespace ssl = asio::ssl; -namespace json = boost::json; -using tcp = asio::ip::tcp; - -using namespace boost::asio::experimental::awaitable_operators; -using namespace std::chrono_literals; - -static const auto RECONNECT_DELAY = 10s; -static const auto PING_PERIOD = 15s; -static const char* const CHANNEL_POINTS_TOPIC = "channel-points-channel-v1"; - -PubsubListener::PubsubListener(TwitchAuth& twitchAuth, RewardRedemptionQueue& rewardRedemptionQueue) - : twitchAuth(twitchAuth), rewardRedemptionQueue(rewardRedemptionQueue), pubsubThread(1), - usernameCondVar(pubsubThread.ioContext, boost::posix_time::pos_infin), - lastPongReceivedAt(std::chrono::steady_clock::now()) { - connect(&twitchAuth, &TwitchAuth::onUsernameChanged, this, &PubsubListener::reconnectAfterUsernameChange); - asio::co_spawn(pubsubThread.ioContext, asyncReconnectToPubsubForever(), asio::detached); -} - -PubsubListener::~PubsubListener() { - pubsubThread.stop(); -} - -void PubsubListener::reconnectAfterUsernameChange() { - asio::post(pubsubThread.ioContext, [this] { - usernameCondVar.cancel(); // Equivalent to notify_all() for a condition variable. - }); -} - -const char* PubsubListener::NoPingAnswerException::what() const noexcept { - return "NoPingAnswerException"; -} - -asio::awaitable PubsubListener::asyncReconnectToPubsubForever() { - while (true) { - std::optional usernameOptional = twitchAuth.getUsername(); - if (!usernameOptional.has_value()) { - try { - co_await usernameCondVar.async_wait(asio::use_awaitable); - } catch (const boost::system::system_error&) { - // Username updated. - } - continue; - } - std::string username = usernameOptional.value(); - - try { - co_await (asyncConnectToPubsub(username) && usernameCondVar.async_wait(asio::use_awaitable)); - } catch (const std::exception& e) { - log(LOG_ERROR, "Exception in asyncReconnectToPubsubForever: {}", e.what()); - } - - if (twitchAuth.getUsername() != username) { - // Disconnected because of a username change - reconnect immediately. - continue; - } - co_await asio::steady_timer(pubsubThread.ioContext, RECONNECT_DELAY).async_wait(asio::use_awaitable); - } -} - -asio::awaitable PubsubListener::asyncConnectToPubsub(const std::string& username) { - log(LOG_INFO, "Connecting to PubSub for user {}", username); - WebsocketStream ws = co_await asyncConnect("pubsub-edge.twitch.tv"); - co_await asyncSubscribeToChannelPoints(ws); - co_await (asyncSendPingMessages(ws) && asyncReadMessages(ws)); -} - -asio::awaitable PubsubListener::asyncConnect(const std::string& host) { - ssl::context sslContext{ssl::context::tlsv12}; - sslContext.set_default_verify_paths(); - tcp::resolver resolver{pubsubThread.ioContext}; - WebsocketStream ws{pubsubThread.ioContext, sslContext}; - const auto resolveResults = co_await resolver.async_resolve(host, "https", asio::use_awaitable); - - co_await asio::async_connect(get_lowest_layer(ws), resolveResults, asio::use_awaitable); - if (!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), host.c_str())) { - throw boost::system::system_error( - boost::system::error_code(static_cast(::ERR_get_error()), asio::error::get_ssl_category()), - "Failed to set SNI Hostname" - ); - } - co_await ws.next_layer().async_handshake(ssl::stream_base::client, asio::use_awaitable); - co_await ws.async_handshake(host, "/", asio::use_awaitable); - co_return ws; -} - -asio::awaitable PubsubListener::asyncSubscribeToChannelPoints(WebsocketStream& ws) { - std::string topicName = fmt::format("{}.{}", CHANNEL_POINTS_TOPIC, twitchAuth.getUserIdOrThrow()); - json::value message{ - {"type", "LISTEN"}, - { - "data", - { - { - "topics", - {topicName}, - }, - {"auth_token", twitchAuth.getAccessTokenOrThrow()}, - }, - }, - }; - co_await asyncSendMessage(ws, message); -} - -asio::awaitable PubsubListener::asyncSendPingMessages(WebsocketStream& ws) { - json::value message{{"type", "PING"}}; - while (true) { - auto sentPingAt = std::chrono::steady_clock::now(); - co_await asyncSendMessage(ws, message); - co_await asio::steady_timer(pubsubThread.ioContext, PING_PERIOD).async_wait(asio::use_awaitable); - if (lastPongReceivedAt < sentPingAt) { - throw NoPingAnswerException(); - } - } -} - -asio::awaitable PubsubListener::asyncReadMessages(WebsocketStream& ws) { - while (true) { - json::value message = co_await asyncReadMessage(ws); - std::string type = value_to(message.at("type")); - if (type == "PONG") { - lastPongReceivedAt = std::chrono::steady_clock::now(); - } else if (type == "MESSAGE") { - std::string topic = value_to(message.at("data").at("topic")); - if (!topic.starts_with(CHANNEL_POINTS_TOPIC)) { - continue; - } - - json::value rewardMessage = json::parse(value_to(message.at("data").at("message"))); - std::string rewardMessageType = value_to(rewardMessage.at("type")); - if (rewardMessageType != "reward-redeemed") { - continue; - } - - json::value redemption = rewardMessage.at("data").at("redemption"); - Reward reward = TwitchRewardsApi::parsePubsubReward(redemption.at("reward")); - std::string redemptionId = value_to(redemption.at("id")); - rewardRedemptionQueue.queueRewardRedemption(RewardRedemption{reward, redemptionId}); - } - } -} - -asio::awaitable PubsubListener::asyncReadMessage(WebsocketStream& ws) { - std::string message; - auto buffer = asio::dynamic_buffer(message); - co_await ws.async_read(buffer, asio::use_awaitable); - if (message.empty()) { - co_return json::value{}; - } - co_return json::parse(message); -} - -asio::awaitable PubsubListener::asyncSendMessage(WebsocketStream& ws, const json::value& message) { - std::string messageSerialized = json::serialize(message); - co_await ws.async_write(asio::buffer(messageSerialized), asio::use_awaitable); -} diff --git a/src/PubsubListener.h b/src/PubsubListener.h deleted file mode 100644 index 6b12942..0000000 --- a/src/PubsubListener.h +++ /dev/null @@ -1,47 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-only -// Copyright (c) 2023, Lev Leontev - -#pragma once -#include -#include -#include -#include - -#include "BoostAsio.h" -#include "IoThreadPool.h" -#include "RewardRedemptionQueue.h" -#include "TwitchAuth.h" - -/// Listens to channel points redemptions. Read https://dev.twitch.tv/docs/pubsub/ for API documentation. -class PubsubListener : public QObject { - Q_OBJECT - -public: - PubsubListener(TwitchAuth& twitchAuth, RewardRedemptionQueue& rewardRedemptionQueue); - ~PubsubListener(); - -private slots: - void reconnectAfterUsernameChange(); - -private: - using WebsocketStream = boost::beast::websocket::stream>; - - class NoPingAnswerException : public std::exception { - const char* what() const noexcept override; - }; - - boost::asio::awaitable asyncReconnectToPubsubForever(); - boost::asio::awaitable asyncConnectToPubsub(const std::string& username); - boost::asio::awaitable asyncConnect(const std::string& host); - boost::asio::awaitable asyncSubscribeToChannelPoints(WebsocketStream& ws); - boost::asio::awaitable asyncSendPingMessages(WebsocketStream& ws); - boost::asio::awaitable asyncReadMessages(WebsocketStream& ws); - static boost::asio::awaitable asyncReadMessage(WebsocketStream& ws); - static boost::asio::awaitable asyncSendMessage(WebsocketStream& ws, const boost::json::value& message); - - TwitchAuth& twitchAuth; - RewardRedemptionQueue& rewardRedemptionQueue; - IoThreadPool pubsubThread; - boost::asio::deadline_timer usernameCondVar; - std::chrono::steady_clock::time_point lastPongReceivedAt; -}; diff --git a/src/RewardsTheaterPlugin.cpp b/src/RewardsTheaterPlugin.cpp index b9d4719..dfee392 100644 --- a/src/RewardsTheaterPlugin.cpp +++ b/src/RewardsTheaterPlugin.cpp @@ -39,7 +39,7 @@ RewardsTheaterPlugin::RewardsTheaterPlugin() ), twitchRewardsApi(twitchAuth, httpClient, settings, ioThreadPool.ioContext), githubUpdateApi(httpClient, ioThreadPool.ioContext), rewardRedemptionQueue(settings, twitchRewardsApi), - pubsubListener(twitchAuth, rewardRedemptionQueue) { + eventsubListener(twitchAuth, httpClient, rewardRedemptionQueue) { checkMinObsVersion(); checkRestrictedRegion(); diff --git a/src/RewardsTheaterPlugin.h b/src/RewardsTheaterPlugin.h index 66c9538..31488d6 100644 --- a/src/RewardsTheaterPlugin.h +++ b/src/RewardsTheaterPlugin.h @@ -7,10 +7,10 @@ #include +#include "EventsubListener.h" #include "GithubUpdateApi.h" #include "HttpClient.h" #include "IoThreadPool.h" -#include "PubsubListener.h" #include "RewardRedemptionQueue.h" #include "Settings.h" #include "TwitchAuth.h" @@ -46,5 +46,5 @@ class RewardsTheaterPlugin { TwitchRewardsApi twitchRewardsApi; GithubUpdateApi githubUpdateApi; RewardRedemptionQueue rewardRedemptionQueue; - PubsubListener pubsubListener; + EventsubListener eventsubListener; }; diff --git a/src/TwitchRewardsApi.cpp b/src/TwitchRewardsApi.cpp index 2cf27ce..d81a9f1 100644 --- a/src/TwitchRewardsApi.cpp +++ b/src/TwitchRewardsApi.cpp @@ -65,19 +65,19 @@ void TwitchRewardsApi::updateRedemptionStatus(const RewardRedemption& rewardRede asio::co_spawn(ioContext, asyncUpdateRedemptionStatus(rewardRedemption, status), asio::detached); } -Reward TwitchRewardsApi::parsePubsubReward(const json::value& reward) { - // The format of the reward for PubSub events differs slightly from the Helix one, hence we need another function. +Reward TwitchRewardsApi::parseEventsubReward(const json::value& reward) { + // EventSub only provides the first four fields return Reward{ value_to(reward.at("id")), value_to(reward.at("title")), value_to(reward.at("prompt")), value_to(reward.at("cost")), - getImageUrl(reward), - reward.at("is_enabled").as_bool(), - value_to(reward.at("background_color")), - getOptionalSetting(reward.at("max_per_stream"), "max_per_stream"), - getOptionalSetting(reward.at("max_per_user_per_stream"), "max_per_user_per_stream"), - getOptionalSetting(reward.at("global_cooldown"), "global_cooldown_seconds"), + boost::urls::url{}, + true, + Color{}, + std::nullopt, + std::nullopt, + std::nullopt, false, }; } diff --git a/src/TwitchRewardsApi.h b/src/TwitchRewardsApi.h index adb798c..4de94bb 100644 --- a/src/TwitchRewardsApi.h +++ b/src/TwitchRewardsApi.h @@ -52,7 +52,7 @@ class TwitchRewardsApi : public QObject { }; void updateRedemptionStatus(const RewardRedemption& rewardRedemption, RedemptionStatus status); - static Reward parsePubsubReward(const boost::json::value& reward); + static Reward parseEventsubReward(const boost::json::value& reward); class EmptyRewardTitleException : public std::exception { public: