Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for an external signature server #4772

Merged
merged 11 commits into from
Aug 13, 2024
15 changes: 0 additions & 15 deletions config/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -343,21 +343,6 @@ full_refresh: false
##
feed_threads: 1

##
## Enable/Disable the polling job that keeps the decryption
## function (for "secured" videos) up to date.
##
## Note: This part of the code generate a small amount of data every minute.
## This may not be desired if you have bandwidth limits set by your ISP.
##
## Note 2: This part of the code is currently broken, so changing
## this setting has no impact.
##
## Accepted values: true, false
## Default: false
##
#decrypt_polling: false


jobs:

Expand Down
5 changes: 0 additions & 5 deletions src/invidious.cr
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,6 @@ if CONFIG.feed_threads > 0
Invidious::Jobs.register Invidious::Jobs::RefreshFeedsJob.new(PG_DB)
end

DECRYPT_FUNCTION = DecryptFunction.new(CONFIG.decrypt_polling)
if CONFIG.decrypt_polling
Invidious::Jobs.register Invidious::Jobs::UpdateDecryptFunctionJob.new
end

if CONFIG.statistics_enabled
Invidious::Jobs.register Invidious::Jobs::StatisticsRefreshJob.new(PG_DB, SOFTWARE)
end
Expand Down
2 changes: 0 additions & 2 deletions src/invidious/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ class Config
# Database configuration using 12-Factor "Database URL" syntax
@[YAML::Field(converter: Preferences::URIConverter)]
property database_url : URI = URI.parse("")
# Use polling to keep decryption function up to date
property decrypt_polling : Bool = false
# Used for crawling channels: threads should check all videos uploaded by a channel
property full_refresh : Bool = false

Expand Down
8 changes: 4 additions & 4 deletions src/invidious/helpers/crystal_class_overrides.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
# IPv6 addresses.
#
class TCPSocket
def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, family = Socket::Family::UNSPEC)
def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
super(addrinfo.family, addrinfo.type, addrinfo.protocol)
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
connect(addrinfo, timeout: connect_timeout) do |error|
close
error
Expand All @@ -26,7 +26,7 @@ class HTTP::Client
end

hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, @family
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family
io.read_timeout = @read_timeout if @read_timeout
io.write_timeout = @write_timeout if @write_timeout
io.sync = false
Expand All @@ -35,7 +35,7 @@ class HTTP::Client
if tls = @tls
tcp_socket = io
begin
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host)
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
rescue exc
# don't leak the TCP socket when the SSL connection failed
tcp_socket.close
Expand Down
325 changes: 325 additions & 0 deletions src/invidious/helpers/sig_helper.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
require "uri"
require "socket"
require "socket/tcp_socket"
require "socket/unix_socket"

{% if flag?(:advanced_debug) %}
require "io/hexdump"
{% end %}

private alias NetworkEndian = IO::ByteFormat::NetworkEndian

class Invidious::SigHelper
enum UpdateStatus
Updated
UpdateNotRequired
Error
end

# -------------------
# Payload types
# -------------------

abstract struct Payload
end

struct StringPayload < Payload
getter string : String

def initialize(str : String)
raise Exception.new("SigHelper: String can't be empty") if str.empty?
@string = str
end

def self.from_bytes(slice : Bytes)
size = IO::ByteFormat::NetworkEndian.decode(UInt16, slice)
if size == 0 # Error code
raise Exception.new("SigHelper: Server encountered an error")
end

if (slice.bytesize - 2) != size
raise Exception.new("SigHelper: String size mismatch")
end

if str = String.new(slice[2..])
return self.new(str)
else
raise Exception.new("SigHelper: Can't read string from socket")
end
end

def to_io(io)
# `.to_u16` raises if there is an overflow during the conversion
io.write_bytes(@string.bytesize.to_u16, NetworkEndian)
io.write(@string.to_slice)
end
end

private enum Opcode
FORCE_UPDATE = 0
DECRYPT_N_SIGNATURE = 1
DECRYPT_SIGNATURE = 2
GET_SIGNATURE_TIMESTAMP = 3
GET_PLAYER_STATUS = 4
end

private record Request,
opcode : Opcode,
payload : Payload?

# ----------------------
# High-level functions
# ----------------------

module Client
extend self

# Forces the server to re-fetch the YouTube player, and extract the necessary
# components from it (nsig function code, sig function code, signature timestamp).
def force_update : UpdateStatus
request = Request.new(Opcode::FORCE_UPDATE, nil)

value = send_request(request) do |bytes|
IO::ByteFormat::NetworkEndian.decode(UInt16, bytes)
end

case value
when 0x0000 then return UpdateStatus::Error
when 0xFFFF then return UpdateStatus::UpdateNotRequired
when 0xF44F then return UpdateStatus::Updated
else
code = value.nil? ? "nil" : value.to_s(base: 16)
raise Exception.new("SigHelper: Invalid status code received #{code}")
end
end

# Decrypt a provided n signature using the server's current nsig function
# code, and return the result (or an error).
def decrypt_n_param(n : String) : String?
request = Request.new(Opcode::DECRYPT_N_SIGNATURE, StringPayload.new(n))

n_dec = send_request(request) do |bytes|
StringPayload.from_bytes(bytes).string
end

return n_dec
end

# Decrypt a provided s signature using the server's current sig function
# code, and return the result (or an error).
def decrypt_sig(sig : String) : String?
request = Request.new(Opcode::DECRYPT_SIGNATURE, StringPayload.new(sig))

sig_dec = send_request(request) do |bytes|
StringPayload.from_bytes(bytes).string
end

return sig_dec
end

# Return the signature timestamp from the server's current player
def get_sts : UInt64?
request = Request.new(Opcode::GET_SIGNATURE_TIMESTAMP, nil)

return send_request(request) do |bytes|
IO::ByteFormat::NetworkEndian.decode(UInt64, bytes)
end
end

# Return the current player's version
def get_player : UInt32?
request = Request.new(Opcode::GET_PLAYER_STATUS, nil)

send_request(request) do |bytes|
has_player = (bytes[0] == 0xFF)
player_version = IO::ByteFormat::NetworkEndian.decode(UInt32, bytes[1..4])
end

return has_player ? player_version : nil
end

private def send_request(request : Request, &)
channel = Multiplexor::INSTANCE.send(request)
slice = channel.receive
return yield slice
rescue ex
LOGGER.debug("SigHelper: Error when sending a request")
LOGGER.trace(ex.inspect_with_backtrace)
return nil
end
end

# ---------------------
# Low level functions
# ---------------------

class Multiplexor
alias TransactionID = UInt32
record Transaction, channel = ::Channel(Bytes).new

@prng = Random.new
@mutex = Mutex.new
@queue = {} of TransactionID => Transaction

@conn : Connection

INSTANCE = new("")

def initialize(url : String)
@conn = Connection.new(url)
listen
end

def listen : Nil
raise "Socket is closed" if @conn.closed?

LOGGER.debug("SigHelper: Multiplexor listening")

# TODO: reopen socket if unexpectedly closed
spawn do
loop do
receive_data
Fiber.yield
end
end
end

def send(request : Request)
transaction = Transaction.new
transaction_id = @prng.rand(TransactionID)

# Add transaction to queue
@mutex.synchronize do
# On a 32-bits random integer, this should never happen. Though, just in case, ...
if @queue[transaction_id]?
raise Exception.new("SigHelper: Duplicate transaction ID! You got a shiny pokemon!")
end

@queue[transaction_id] = transaction
end

write_packet(transaction_id, request)

return transaction.channel
end

def receive_data
transaction_id, slice = read_packet

@mutex.synchronize do
if transaction = @queue.delete(transaction_id)
# Remove transaction from queue and send data to the channel
transaction.channel.send(slice)
LOGGER.trace("SigHelper: Transaction unqueued and data sent to channel")
else
raise Exception.new("SigHelper: Received transaction was not in queue")
end
end
end

# Read a single packet from the socket
private def read_packet : {TransactionID, Bytes}
# Header
transaction_id = @conn.read_bytes(UInt32, NetworkEndian)
length = @conn.read_bytes(UInt32, NetworkEndian)

LOGGER.trace("SigHelper: Recv transaction 0x#{transaction_id.to_s(base: 16)} / length #{length}")

if length > 67_000
raise Exception.new("SigHelper: Packet longer than expected (#{length})")
end

# Payload
slice = Bytes.new(length)
@conn.read(slice) if length > 0

LOGGER.trace("SigHelper: payload = #{slice}")
LOGGER.trace("SigHelper: Recv transaction 0x#{transaction_id.to_s(base: 16)} - Done")

return transaction_id, slice
end

# Write a single packet to the socket
private def write_packet(transaction_id : TransactionID, request : Request)
LOGGER.trace("SigHelper: Send transaction 0x#{transaction_id.to_s(base: 16)} / opcode #{request.opcode}")

io = IO::Memory.new(1024)
io.write_bytes(request.opcode.to_u8, NetworkEndian)
io.write_bytes(transaction_id, NetworkEndian)

if payload = request.payload
payload.to_io(io)
end

@conn.send(io)
@conn.flush

LOGGER.trace("SigHelper: Send transaction 0x#{transaction_id.to_s(base: 16)} - Done")
end
end

class Connection
@socket : UNIXSocket | TCPSocket

{% if flag?(:advanced_debug) %}
@io : IO::Hexdump
{% end %}

def initialize(host_or_path : String)
if host_or_path.empty?
host_or_path = "/tmp/inv_sig_helper.sock"
end

case host_or_path
when .starts_with?('/')
@socket = UNIXSocket.new(host_or_path)
when .starts_with?("tcp://")
uri = URI.parse(host_or_path)
@socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!)
else
uri = URI.parse("tcp://#{host_or_path}")
@socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!)
end

LOGGER.debug("SigHelper: Listening on '#{host_or_path}'")

{% if flag?(:advanced_debug) %}
@io = IO::Hexdump.new(@socket, output: STDERR, read: true, write: true)
{% end %}

@socket.sync = false
@socket.blocking = false
end

def closed? : Bool
return @socket.closed?
end

def close : Nil
if @socket.closed?
raise Exception.new("SigHelper: Can't close socket, it's already closed")
else
@socket.close
end
end

def flush(*args, **options)
@socket.flush(*args, **options)
end

def send(*args, **options)
@socket.send(*args, **options)
end

# Wrap IO functions, with added debug tooling if needed
{% for function in %w(read read_bytes write write_bytes) %}
def {{function.id}}(*args, **options)
{% if flag?(:advanced_debug) %}
@io.{{function.id}}(*args, **options)
{% else %}
@socket.{{function.id}}(*args, **options)
{% end %}
end
{% end %}
end
end
Loading
Loading