Skip to content

Commit

Permalink
Fixes for position with nested NoopStreams (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 authored Apr 6, 2024
1 parent 9617908 commit 182b14f
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 21 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ jobs:
- windows-latest
arch:
- x64
include:
- os: ubuntu-latest
version: '1'
arch: x86
- os: macOS-14
version: '1'
arch: aarch64
steps:
- uses: actions/checkout@v4
- uses: julia-actions/setup-julia@v1
Expand Down
1 change: 1 addition & 0 deletions ext/TestExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ function TranscodingStreams.test_chunked_read(Encoder, Decoder)
for chunk in chunks
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
ok &= read(stream) == chunk
ok &= position(stream) == length(chunk)
ok &= eof(stream)
ok &= isreadable(stream)
close(stream)
Expand Down
3 changes: 1 addition & 2 deletions fuzz/fuzz.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ end
for r in rs
d = r(stream)
append!(x, d)
# TODO fix position
# length(x) == position(stream) || return false
length(x) == position(stream) || return false
end
x == data[eachindex(x)]
end
Expand Down
36 changes: 28 additions & 8 deletions src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,18 @@ Note that this method may return a wrong position when
- some data have been inserted by `TranscodingStreams.unread`, or
- the position of the wrapped stream has been changed outside of this package.
"""
function Base.position(stream::NoopStream)
function Base.position(stream::NoopStream)::Int64
mode = stream.state.mode
if mode === :idle
if !isopen(stream)
throw_invalid_mode(mode)
elseif mode === :idle
return Int64(0)
elseif has_sharedbuf(stream)
return position(stream.stream)
elseif mode === :write
return position(stream.stream) + buffersize(stream.buffer1)
elseif mode === :read
else # read
return position(stream.stream) - buffersize(stream.buffer1)
else
throw_invalid_mode(mode)
end
@assert false "unreachable"
end
Expand Down Expand Up @@ -97,16 +99,34 @@ function Base.seekend(stream::NoopStream)
return stream
end

function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)
function Base.write(stream::NoopStream, b::UInt8)::Int
changemode!(stream, :write)
if has_sharedbuf(stream)
# directly write data to the underlying stream
n = Int(write(stream.stream, b))
return n
end
buffer1 = stream.buffer1
marginsize(buffer1) > 0 || flushbuffer(stream)
return writebyte!(buffer1, b)
end

function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int
changemode!(stream, :write)
if has_sharedbuf(stream)
# directly write data to the underlying stream
n = Int(unsafe_write(stream.stream, input, nbytes))
return n
end
buffer = stream.buffer1
if marginsize(buffer) nbytes
copydata!(buffer, input, nbytes)
return Int(nbytes)
else
flushbuffer(stream)
# directly write data to the underlying stream
return unsafe_write(stream.stream, input, nbytes)
n = Int(unsafe_write(stream.stream, input, nbytes))
return n
end
end

Expand Down Expand Up @@ -152,7 +172,7 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int
changemode!(stream, :read)
buffer = stream.buffer1
@assert buffer === stream.buffer2
if stream.stream isa TranscodingStream && buffer === stream.stream.buffer1
if has_sharedbuf(stream)
# Delegate the operation when buffers are shared.
underlying_mode::Symbol = stream.stream.state.mode
if underlying_mode === :idle || underlying_mode === :read
Expand Down
8 changes: 6 additions & 2 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ end
# throw ArgumentError that mode is invalid.
throw_invalid_mode(mode) = throw(ArgumentError(string("invalid mode :", mode)))

# Return true if the stream shares buffers with underlying stream
function has_sharedbuf(stream::TranscodingStream)::Bool
stream.stream isa TranscodingStream && stream.buffer2 === stream.stream.buffer1
end

# Base IO Functions
# -----------------
Expand Down Expand Up @@ -264,7 +268,7 @@ function Base.position(stream::TranscodingStream)
mode = stream.state.mode
if mode === :idle
return Int64(0)
elseif mode === :read
elseif mode === :read || mode === :stop
return stats(stream).out
elseif mode === :write
return stats(stream).in
Expand Down Expand Up @@ -584,7 +588,7 @@ function stats(stream::TranscodingStream)
buffer2 = stream.buffer2
if mode === :idle
transcoded_in = transcoded_out = in = out = 0
elseif mode === :read
elseif mode === :read || mode === :stop
transcoded_in = buffer2.transcoded
transcoded_out = buffer1.transcoded
in = transcoded_in + buffersize(buffer2)
Expand Down
2 changes: 2 additions & 0 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
)
))
@test read(s1) == b""
@test position(s1) == 0
@test eof(s1)

s2 = NoopStream(
Expand All @@ -281,6 +282,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
)
)
@test read(s2) == b""
@test position(s1) == 0
@test eof(s2)
end

Expand Down
41 changes: 38 additions & 3 deletions test/codecnoop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@
close(stream)

stream = TranscodingStream(Noop(), IOBuffer(b"foobarbaz"))
@test position(stream) === 0
@test position(stream) === Int64(0)
read(stream, UInt8)
@test position(stream) === 1
@test position(stream) === Int64(1)
read(stream)
@test position(stream) === 9
@test position(stream) === Int64(9)

data = collect(0x00:0x0f)
stream = TranscodingStream(Noop(), IOBuffer(data))
Expand Down Expand Up @@ -368,6 +368,41 @@
@test position(stream) == pos
end
end

@testset "writing nested NoopStream sharedbuf=$(sharedbuf)" for sharedbuf in (true, false)
stream = NoopStream(NoopStream(IOBuffer()); sharedbuf, bufsize=4)
@test position(stream) == 0
write(stream, 0x01)
@test position(stream) == 1
flush(stream)
@test position(stream) == 1
write(stream, "abc")
@test position(stream) == 4
flush(stream)
@test position(stream) == 4
for i in 1:10
write(stream, 0x01)
@test position(stream) == 4 + i
end
end

@testset "reading nested NoopStream sharedbuf=$(sharedbuf)" for sharedbuf in (true, false)
stream = NoopStream(NoopStream(IOBuffer("abcdefghijk")); sharedbuf, bufsize=4)
@test position(stream) == 0
@test !eof(stream)
@test position(stream) == 0
@test read(stream, UInt8) == b"a"[1]
@test position(stream) == 1
@test read(stream, 3) == b"bcd"
@test position(stream) == 4
@test !eof(stream)
@test position(stream) == 4
@test read(stream) == b"efghijk"
@test position(stream) == 11
@test eof(stream)
@test position(stream) == 11
end

end

@testset "seek doesn't delete data" begin
Expand Down
12 changes: 6 additions & 6 deletions test/codecquadruple.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ end
close(stream)

stream = TranscodingStream(QuadrupleCodec(), IOBuffer("foo"))
@test position(stream) === 0
@test position(stream) === Int64(0)
read(stream, 3)
@test position(stream) === 3
@test position(stream) === Int64(3)
read(stream, UInt8)
@test position(stream) === 4
@test position(stream) === Int64(4)
close(stream)

stream = TranscodingStream(QuadrupleCodec(), IOBuffer())
@test position(stream) === 0
@test position(stream) === Int64(0)
write(stream, 0x00)
@test position(stream) === 1
@test position(stream) === Int64(1)
write(stream, "foo")
@test position(stream) === 4
@test position(stream) === Int64(4)
close(stream)

# Buffers are shared.
Expand Down

0 comments on commit 182b14f

Please sign in to comment.