-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
copyuntil(out::IO, in::IO, delim) #48273
Conversation
I think it looks great, especially if it's a performance boon. Thanks for working on this. |
Yes, they do have an internal buffer, though I'm not sure how large it is. That's the main reason for this PR — there's no good way to re-implement As I mentioned, the alternative is to do some manual buffering, e.g. reading a file in 4k chunks similar to #42225. Of course, the advantage of manual buffering ala #42225 is that it can be implemented in pure Julia code in a package. On the the other hand, the big disadvantage of manual buffering is that it is problematic for non-seekable streams (e.g. pipes) — if you want to read only a few lines, and then do something else with the remainder of the file, the user wants to be able to read from the end of the last line, not from the end of the last buffer chunk. (You can also use BufferedStreams.jl for additional buffering. That's composable with this PR, though in the long run we may want to add a more specialized method for |
I ran some benchmarks on
So, only about a 30% speed improvement by eliminating the On the other hand, it is almost 30× slower if you use a plain If this PR is merged, it would be good to update BufferedStreams.jl to add a specialized Benchmark code using StringViews, BenchmarkTools, BufferedStreams
io = open("HISTORY.md", "r")
@show sum(length, eachline(seekstart(io), keep=true))
@btime sum(length, eachline(seekstart($io), keep=true))
function doit!(io, buf, delim)
s = 0
while !eof(io)
n = readuntil!(io, buf, delim)
s += length(StringView(@view buf[1:n]))
end
return s
end
buf = Array{UInt8}(undef, 1024)
@show doit!(seekstart(io), buf, '\n')
@btime doit!(seekstart($io), $buf, '\n')
bio = BufferedInputStream(seekstart(io))
@show sum(length, eachline((seekstart(bio); bio), keep=true))
@btime sum(length, eachline((seekstart($bio); $bio), keep=true))
@show doit!((seekstart(bio); bio), buf, '\n')
@btime doit!((seekstart($bio); $bio), $buf, '\n')
close(bio) |
I added an optimized iob = IOBuffer(read("HISTORY.md"))
@btime sum(length, eachline(seekstart($iob), keep=true)) previously gave If I use an One complication is that the optimal strategy for Of course, if you have all of your data in an in-memory buffer already, it is almost certainly even better to use the |
Should be ready for review. |
cc @rickbeeloo, author of the https://github.com/rickbeeloo/ViewReader package. On my machine, Once we settle on an API, however, BufferedStreams.jl should be able to implement a specialized |
I was thinking about it some more, and arguably if you want to have maximum performance for an in-place From that perspective, however, a |
True, and indeed something like Rust's std::Io::BufReader might be nice to have. However, how would you keep track of whether it's safe to shift data in the buffer, given that StringViews may hold a reference to any data in the buffer? |
I also agree reading bigger chunks would do better, especially for those reading from HDDs
This works as long as the "to be finished line" is shorter than the currently allocated buffer. If not it might again not find the newline. That would require some extra logic to see if the newline is found yet and otherwise still increase the buffer and extend. Which, in the worst case would give a buffer ~2x the longest line in the file. I didn't really spend time optimizing that, instead, I allocated two buffers (each being the max line length - given by the user) then flip them around like you said and warn if no newline is found after
|
Marking for |
I was thinking about this some more in connection with #48625, and I'm starting to feel that a better API would be: readuntil(out::IO, in::IO, delim) i.e. add an optional
Philosophically, an |
Updated to use the new |
There is already |
@fredrikekre, I think it's also more discoverable to stick to the convention that you can add an For example, we already have |
This is great functionality, triage is 👍 . However, we do like the idea of calling it |
Posting a reference to the IO blocking behavior issue for future reference #24526 |
|
Co-authored-by: Rafael Fourquet <[email protected]>
Co-authored-by: Rafael Fourquet <[email protected]>
c50ee4e
to
13bbd48
Compare
I'm having trouble seeing how the Asan segfault (in (I added an additional parameter |
end | ||
(eof(s) || len == out.maxsize) && break | ||
len = min(2len + 64, out.maxsize) | ||
resize!(d, len) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it might not quite respect maxsize (not copying the last chunk if it exceeded maxsize during that read). Should it use ensureroom
instead? The ensureroom
call at the top also seems wrong, since if someone new the out
stream was already sized correctly for a smaller read, that will forceably try to reallocate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by "not copying the last chunk". By the time it hits the len == out.maxsize
branch, jl_readuntil_buf
has already copied out.maxsize
bytes into the buffer. The jl_readuntil_buf
never reads a chunk exceeding maxsize
, because the number of bytes that it reads is bounded by len - ptr + 1
.
ensureroom
seems like it has quite a few extra checks that aren't needed in the inner loop here given the ensureroom
call at the top.
I guess it wouldn't hurt to change the ensureroom
call at the top to something like ensureroom(out, isempty(out.data) ? 16 : 0)
? Hmm, no, that wouldn't work either… I need at least ensureroom(out, 1)
to be certain that the jl_readuntil_buf
will read at least 1 byte if there is something to read, as otherwise the iszero(n) && break
check is wrong. Probably it's fine to just do ensureroom(out, 1)
here. (The main application of this method is probably to read repeatedly into the same seekstart(buf)
, as in the examples above, in which case ensureroom
will do nothing … the buffer will already be as big as the largest line previously read.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I guess that makes sense. The checks were a bit distant so it wasn't immediately obvious that was being ensured.
Yes, I think checking isempty
first makes sense. I realized now that ensureroom
already truncates the request to maxsize, so there is no issue with calling that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be resolved by #50485
end | ||
return out | ||
end | ||
readuntil(s::IO, delim::T; keep::Bool=false) where T = | ||
_copyuntil(Vector{T}(), s, delim, keep) | ||
readuntil(s::IO, delim::UInt8; keep::Bool=false) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overwrites the definition on line 525, giving a warning during sysimage build.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to catch those warnings and turn them into errors on CI when building the sysimage? I guess we don't want this kind of situations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will file a PR to fix this shortly. Filed #50485
This PR defines and exports
a new functionnew functions:readuntil!(s::IO, buffer::AbstractVector{UInt8}, delim)
that read data from
s
intothebuffer
in-place (resized if needed)out
stream untildelim
is read/written or the end of the stream is reached.The PR was inspired by this post from @jakobnissen: the goal is to make it easier implement an allocation-free
eachline
iterator. This can be done in a package, givenreaduntil
with anIOBuffer
, using theStringViews.jl
package to return a string view of the in-placebuffer
on each iteration.The reason it seemed like this needed a
Base
function, instead of living completely in a package, is thatreaduntil
relies on a low-leveljl_readuntil
C function that would be difficult to replicate in a package. To obtain comparable performance, it seems like we need an analogousjl_readuntil_buf
method (implemented in this PR) and a corresponding Julia API.Moreover, relatively little new code was required because many of the existing
readuntil
methods used anIOBuffer
internally, so it was merely a matter of refactoring and exporting this functionality. Also, we already had an optimized ios_copyuntil function for copying between IOStreams, which can now be exported in the new API.To do:
readbytes!
and then return StringViews on top of that? This could happen completely in a package. It's a lot easier to use something likereaduntil!
, however.)out::IO
variantreadline(out::IO, in::IO)
too?readline(out::IO, in::IO)
methodsBefore I do much more work on this, what do people think?