Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some verbose logs to video download threads #1033

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ internal abstract class TwitchDownloaderArgs
[Option("banner", Default = true, HelpText = "Displays a banner containing version and copyright information.")]
public bool? ShowBanner { get; set; }

[Option("log-level", Default = Models.LogLevel.Status | LogLevel.Info | LogLevel.Warning | LogLevel.Error, HelpText = "Sets the log level flags. Applicable values are: None, Status, Verbose, Info, Warning, Error, Ffmpeg.")]
[Option("log-level", Default = LogLevel.Status | LogLevel.Info | LogLevel.Warning | LogLevel.Error, HelpText = "Sets the log level flags. Applicable values are: None, Status, Verbose, Info, Warning, Error, Ffmpeg.")]
public LogLevel LogLevel { get; set; }
}
}
1 change: 0 additions & 1 deletion TwitchDownloaderCore/Interfaces/ITaskLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ namespace TwitchDownloaderCore.Interfaces
{
public interface ITaskLogger
{
// TODO: Add DefaultInterpolatedStringHandler overloads once log levels are implemented for zero-alloc logging
void LogVerbose(string logMessage);
void LogVerbose(DefaultInterpolatedStringHandler logMessage);
void LogInfo(string logMessage);
Expand Down
94 changes: 94 additions & 0 deletions TwitchDownloaderCore/Tools/DownloadTools.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using TwitchDownloaderCore.Interfaces;

namespace TwitchDownloaderCore.Tools
{
public static class DownloadTools
{
/// <summary>
/// Downloads the requested <paramref name="url"/> to the <paramref name="destinationFile"/> without storing it in memory.
/// </summary>
/// <param name="httpClient">The <see cref="HttpClient"/> to perform the download operation.</param>
/// <param name="url">The url of the file to download.</param>
/// <param name="destinationFile">The path to the file where download will be saved.</param>
/// <param name="throttleKib">The maximum download speed in kibibytes per second, or -1 for no maximum.</param>
/// <param name="logger">Logger.</param>
/// <param name="cancellationTokenSource">A <see cref="CancellationTokenSource"/> containing a <see cref="CancellationToken"/> to cancel the operation.</param>
/// <remarks>The <paramref name="cancellationTokenSource"/> may be canceled by this method.</remarks>
public static async Task DownloadFileAsync(HttpClient httpClient, Uri url, string destinationFile, int throttleKib, ITaskLogger logger, CancellationTokenSource cancellationTokenSource = null)
{
var request = new HttpRequestMessage(HttpMethod.Get, url);

var cancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None;

using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
response.EnsureSuccessStatusCode();

// Why are we setting a CTS CancelAfter timer? See lay295#265
const int SIXTY_SECONDS = 60;
if (throttleKib == -1 || !response.Content.Headers.ContentLength.HasValue)
{
cancellationTokenSource?.CancelAfter(TimeSpan.FromSeconds(SIXTY_SECONDS));
}
else
{
const double ONE_KIBIBYTE = 1024d;
cancellationTokenSource?.CancelAfter(TimeSpan.FromSeconds(Math.Max(
SIXTY_SECONDS,
response.Content.Headers.ContentLength!.Value / ONE_KIBIBYTE / throttleKib * 8 // Allow up to 8x the shortest download time given the thread bandwidth
)));
}

switch (throttleKib)
{
case -1:
{
await using var fs = new FileStream(destinationFile, FileMode.Create, FileAccess.Write, FileShare.Read);
await response.Content.CopyToAsync(fs, cancellationToken).ConfigureAwait(false);
break;
}
default:
{
try
{
await using var contentStream = await response.Content.ReadAsStreamAsync(cancellationToken);
await using var throttledStream = new ThrottledStream(contentStream, throttleKib);
await using var fs = new FileStream(destinationFile, FileMode.Create, FileAccess.Write, FileShare.Read);
await throttledStream.CopyToAsync(fs, cancellationToken).ConfigureAwait(false);
}
catch (IOException ex) when (ex.Message.Contains("EOF"))
{
// If we get an exception for EOF, it may be related to the throttler. Try again without it.
logger.LogVerbose($"Unexpected EOF, retrying without bandwidth throttle. Message: {ex.Message}.");
await Task.Delay(2_000, cancellationToken);
goto case -1;
}
break;
}
}

// Reset the cts timer so it can be reused for the next download on this thread.
// Is there a friendlier way to do this? Yes. Does it involve creating and destroying 4,000 CancellationTokenSources that are almost never cancelled? Also Yes.
cancellationTokenSource?.CancelAfter(TimeSpan.FromMilliseconds(uint.MaxValue - 1));
}


/// <summary>
/// Some old twitch VODs have files with a query string at the end such as 1.ts?offset=blah which isn't a valid filename
/// </summary>
public static string RemoveQueryString(string inputString)
{
var queryIndex = inputString.IndexOf('?');
if (queryIndex == -1)
{
return inputString;
}

return inputString[..queryIndex];
}
}
}
153 changes: 153 additions & 0 deletions TwitchDownloaderCore/Tools/VideoDownloadThread.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using TwitchDownloaderCore.Interfaces;

namespace TwitchDownloaderCore.Tools
{
internal sealed record VideoDownloadThread
{
private readonly ConcurrentQueue<string> _videoPartsQueue;
private readonly HttpClient _client;
private readonly Uri _baseUrl;
private readonly string _cacheFolder;
private readonly DateTimeOffset _vodAirDate;
private TimeSpan VodAge => DateTimeOffset.UtcNow - _vodAirDate;
private readonly int _throttleKib;
private readonly ITaskLogger _logger;
private readonly CancellationToken _cancellationToken;
public Task ThreadTask { get; private set; }

public VideoDownloadThread(ConcurrentQueue<string> videoPartsQueue, HttpClient httpClient, Uri baseUrl, string cacheFolder, DateTimeOffset vodAirDate, int throttleKib, ITaskLogger logger, CancellationToken cancellationToken)
{
_videoPartsQueue = videoPartsQueue;
_client = httpClient;
_baseUrl = baseUrl;
_cacheFolder = cacheFolder;
_vodAirDate = vodAirDate;
_throttleKib = throttleKib;
_logger = logger;
_cancellationToken = cancellationToken;
StartDownload();
}

public void StartDownload()
{
if (ThreadTask is { Status: TaskStatus.Created or TaskStatus.WaitingForActivation or TaskStatus.WaitingToRun or TaskStatus.Running })
{
throw new InvalidOperationException($"Tried to start a thread that was already running or waiting to run ({ThreadTask.Status}).");
}

ThreadTask = Task.Factory.StartNew(
ExecuteDownloadThread,
_cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Current);
}

private void ExecuteDownloadThread()
{
using var cts = new CancellationTokenSource();
_cancellationToken.Register(PropagateCancel, cts);

while (!_videoPartsQueue.IsEmpty)
{
_cancellationToken.ThrowIfCancellationRequested();

string videoPart = null;
try
{
if (_videoPartsQueue.TryDequeue(out videoPart))
{
DownloadVideoPartAsync(videoPart, cts).GetAwaiter().GetResult();
}
}
catch
{
if (videoPart != null && !_cancellationToken.IsCancellationRequested)
{
// Requeue the video part now instead of deferring to the verifier since we already know it's bad
_videoPartsQueue.Enqueue(videoPart);
}

throw;
}

const int A_PRIME_NUMBER = 71;
Thread.Sleep(A_PRIME_NUMBER);
}
}

private static void PropagateCancel(object tokenSourceToCancel)
{
try
{
(tokenSourceToCancel as CancellationTokenSource)?.Cancel();
}
catch (ObjectDisposedException) { }
}

/// <remarks>The <paramref name="cancellationTokenSource"/> may be canceled by this method.</remarks>
private async Task DownloadVideoPartAsync(string videoPartName, CancellationTokenSource cancellationTokenSource)
{
var tryUnmute = VodAge < TimeSpan.FromHours(24);
var errorCount = 0;
var timeoutCount = 0;
while (true)
{
cancellationTokenSource.Token.ThrowIfCancellationRequested();

try
{
var partFile = Path.Combine(_cacheFolder, DownloadTools.RemoveQueryString(videoPartName));
if (tryUnmute && videoPartName.Contains("-muted"))
{
var unmutedPartName = videoPartName.Replace("-muted", "");
await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, unmutedPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
}
else
{
await DownloadTools.DownloadFileAsync(_client, new Uri(_baseUrl, videoPartName), partFile, _throttleKib, _logger, cancellationTokenSource);
}

return;
}
catch (HttpRequestException ex) when (tryUnmute && ex.StatusCode is HttpStatusCode.Forbidden)
{
_logger.LogVerbose($"Received {ex.StatusCode}: {ex.StatusCode} when trying to unmute {videoPartName}. Disabling {nameof(tryUnmute)}.");
tryUnmute = false;

await Task.Delay(100, cancellationTokenSource.Token);
}
catch (HttpRequestException ex)
{
const int MAX_RETRIES = 10;

_logger.LogVerbose($"Received {(int)(ex.StatusCode ?? 0)}: {ex.StatusCode} for {videoPartName}. {MAX_RETRIES - (errorCount + 1)} retries left.");
if (++errorCount > MAX_RETRIES)
{
throw new HttpRequestException($"Video part {videoPartName} failed after {MAX_RETRIES} retries");
}

await Task.Delay(1_000 * errorCount, cancellationTokenSource.Token);
}
catch (TaskCanceledException ex) when (ex.Message.Contains("HttpClient.Timeout"))
{
const int MAX_RETRIES = 3;

_logger.LogVerbose($"{videoPartName} timed out. {MAX_RETRIES - (timeoutCount + 1)} retries left.");
if (++timeoutCount > MAX_RETRIES)
{
throw new HttpRequestException($"Video part {videoPartName} timed out {MAX_RETRIES} times");
}

await Task.Delay(5_000 * timeoutCount, cancellationTokenSource.Token);
}
}
}
}
}
Loading