From e500754118c64ecc16836f426c251582fddbffb5 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 25 Sep 2024 11:22:23 -0400 Subject: [PATCH 1/3] [FileWatching] fix PollingFileWatcher design and add workaround for a stat bug What started as an innocent fix for a stat bug on Apple (#48667) turned into a full blown investigation into the design problems with the libuv backend for PollingFileWatcher, and writing my own implementation of it instead which could avoid those singled-threaded concurrency bugs. --- base/libuv.jl | 8 +- base/reflection.jl | 3 +- base/stat.jl | 111 +++++------ src/sys.c | 1 - stdlib/FileWatching/src/FileWatching.jl | 240 ++++++++++++++---------- stdlib/FileWatching/test/runtests.jl | 9 +- test/file.jl | 10 + 7 files changed, 215 insertions(+), 167 deletions(-) diff --git a/base/libuv.jl b/base/libuv.jl index 3c9f79dfa7b2c..306854e9f4436 100644 --- a/base/libuv.jl +++ b/base/libuv.jl @@ -26,10 +26,10 @@ for r in uv_req_types @eval const $(Symbol("_sizeof_", lowercase(string(r)))) = uv_sizeof_req($r) end -uv_handle_data(handle) = ccall(:jl_uv_handle_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) -uv_req_data(handle) = ccall(:jl_uv_req_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) -uv_req_set_data(req, data) = ccall(:jl_uv_req_set_data, Cvoid, (Ptr{Cvoid}, Any), req, data) -uv_req_set_data(req, data::Ptr{Cvoid}) = ccall(:jl_uv_req_set_data, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), req, data) +uv_handle_data(handle) = ccall(:uv_handle_get_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) +uv_req_data(handle) = ccall(:uv_req_get_data, Ptr{Cvoid}, (Ptr{Cvoid},), handle) +uv_req_set_data(req, data) = ccall(:uv_req_set_data, Cvoid, (Ptr{Cvoid}, Any), req, data) +uv_req_set_data(req, data::Ptr{Cvoid}) = ccall(:uv_handle_set_data, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), req, data) macro handle_as(hand, typ) return quote diff --git a/base/reflection.jl b/base/reflection.jl index fe48b6f9aa6b9..be0209872db34 100644 --- a/base/reflection.jl +++ b/base/reflection.jl @@ -964,7 +964,7 @@ use it in the following manner to summarize information about a struct: julia> structinfo(T) = [(fieldoffset(T,i), fieldname(T,i), fieldtype(T,i)) for i = 1:fieldcount(T)]; julia> structinfo(Base.Filesystem.StatStruct) -13-element Vector{Tuple{UInt64, Symbol, Type}}: +14-element Vector{Tuple{UInt64, Symbol, Type}}: (0x0000000000000000, :desc, Union{RawFD, String}) (0x0000000000000008, :device, UInt64) (0x0000000000000010, :inode, UInt64) @@ -978,6 +978,7 @@ julia> structinfo(Base.Filesystem.StatStruct) (0x0000000000000050, :blocks, Int64) (0x0000000000000058, :mtime, Float64) (0x0000000000000060, :ctime, Float64) + (0x0000000000000068, :ioerrno, Int32) ``` """ fieldoffset(x::DataType, idx::Integer) = (@_foldable_meta; ccall(:jl_get_field_offset, Csize_t, (Any, Cint), x, idx)) diff --git a/base/stat.jl b/base/stat.jl index 506b5644dccbc..c6fb239a96404 100644 --- a/base/stat.jl +++ b/base/stat.jl @@ -63,6 +63,7 @@ struct StatStruct blocks :: Int64 mtime :: Float64 ctime :: Float64 + ioerrno :: Int32 end @eval function Base.:(==)(x::StatStruct, y::StatStruct) # do not include `desc` in equality or hash @@ -80,22 +81,23 @@ end end) end -StatStruct() = StatStruct("", 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) -StatStruct(buf::Union{Vector{UInt8},Ptr{UInt8}}) = StatStruct("", buf) -StatStruct(desc::Union{AbstractString, OS_HANDLE}, buf::Union{Vector{UInt8},Ptr{UInt8}}) = StatStruct( +StatStruct() = StatStruct("", 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, Base.UV_ENOENT) +StatStruct(buf::Union{Memory{UInt8},Vector{UInt8},Ptr{UInt8}}, ioerrno::Int32) = StatStruct("", buf, ioerrno) +StatStruct(desc::Union{AbstractString, OS_HANDLE}, buf::Union{Memory{UInt8},Vector{UInt8},Ptr{UInt8}}, ioerrno::Int32) = StatStruct( desc isa OS_HANDLE ? desc : String(desc), - ccall(:jl_stat_dev, UInt32, (Ptr{UInt8},), buf), - ccall(:jl_stat_ino, UInt32, (Ptr{UInt8},), buf), - ccall(:jl_stat_mode, UInt32, (Ptr{UInt8},), buf), - ccall(:jl_stat_nlink, UInt32, (Ptr{UInt8},), buf), - ccall(:jl_stat_uid, UInt32, (Ptr{UInt8},), buf), - ccall(:jl_stat_gid, UInt32, (Ptr{UInt8},), buf), - ccall(:jl_stat_rdev, UInt32, (Ptr{UInt8},), buf), - ccall(:jl_stat_size, UInt64, (Ptr{UInt8},), buf), - ccall(:jl_stat_blksize, UInt64, (Ptr{UInt8},), buf), - ccall(:jl_stat_blocks, UInt64, (Ptr{UInt8},), buf), - ccall(:jl_stat_mtime, Float64, (Ptr{UInt8},), buf), - ccall(:jl_stat_ctime, Float64, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt32) : ccall(:jl_stat_dev, UInt32, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt32) : ccall(:jl_stat_ino, UInt32, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt32) : ccall(:jl_stat_mode, UInt32, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt32) : ccall(:jl_stat_nlink, UInt32, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt32) : ccall(:jl_stat_uid, UInt32, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt32) : ccall(:jl_stat_gid, UInt32, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt32) : ccall(:jl_stat_rdev, UInt32, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt64) : ccall(:jl_stat_size, UInt64, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt64) : ccall(:jl_stat_blksize, UInt64, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(UInt64) : ccall(:jl_stat_blocks, UInt64, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(Float64) : ccall(:jl_stat_mtime, Float64, (Ptr{UInt8},), buf), + ioerrno != 0 ? zero(Float64) : ccall(:jl_stat_ctime, Float64, (Ptr{UInt8},), buf), + ioerrno ) function iso_datetime_with_relative(t, tnow) @@ -130,35 +132,41 @@ end function show_statstruct(io::IO, st::StatStruct, oneline::Bool) print(io, oneline ? "StatStruct(" : "StatStruct for ") show(io, st.desc) - oneline || print(io, "\n ") - print(io, " size: ", st.size, " bytes") - oneline || print(io, "\n") - print(io, " device: ", st.device) - oneline || print(io, "\n ") - print(io, " inode: ", st.inode) - oneline || print(io, "\n ") - print(io, " mode: 0o", string(filemode(st), base = 8, pad = 6), " (", filemode_string(st), ")") - oneline || print(io, "\n ") - print(io, " nlink: ", st.nlink) - oneline || print(io, "\n ") - print(io, " uid: $(st.uid)") - username = getusername(st.uid) - username === nothing || print(io, " (", username, ")") - oneline || print(io, "\n ") - print(io, " gid: ", st.gid) - groupname = getgroupname(st.gid) - groupname === nothing || print(io, " (", groupname, ")") - oneline || print(io, "\n ") - print(io, " rdev: ", st.rdev) - oneline || print(io, "\n ") - print(io, " blksz: ", st.blksize) - oneline || print(io, "\n") - print(io, " blocks: ", st.blocks) - tnow = round(UInt, time()) - oneline || print(io, "\n ") - print(io, " mtime: ", iso_datetime_with_relative(st.mtime, tnow)) - oneline || print(io, "\n ") - print(io, " ctime: ", iso_datetime_with_relative(st.ctime, tnow)) + code = st.ioerrno + if code != 0 + print(io, oneline ? " " : "\n ") + print(io, Base.uverrorname(code), ": ", Base.struverror(code)) + else + oneline || print(io, "\n ") + print(io, " size: ", st.size, " bytes") + oneline || print(io, "\n") + print(io, " device: ", st.device) + oneline || print(io, "\n ") + print(io, " inode: ", st.inode) + oneline || print(io, "\n ") + print(io, " mode: 0o", string(filemode(st), base = 8, pad = 6), " (", filemode_string(st), ")") + oneline || print(io, "\n ") + print(io, " nlink: ", st.nlink) + oneline || print(io, "\n ") + print(io, " uid: $(st.uid)") + username = getusername(st.uid) + username === nothing || print(io, " (", username, ")") + oneline || print(io, "\n ") + print(io, " gid: ", st.gid) + groupname = getgroupname(st.gid) + groupname === nothing || print(io, " (", groupname, ")") + oneline || print(io, "\n ") + print(io, " rdev: ", st.rdev) + oneline || print(io, "\n ") + print(io, " blksz: ", st.blksize) + oneline || print(io, "\n") + print(io, " blocks: ", st.blocks) + tnow = round(UInt, time()) + oneline || print(io, "\n ") + print(io, " mtime: ", iso_datetime_with_relative(st.mtime, tnow)) + oneline || print(io, "\n ") + print(io, " ctime: ", iso_datetime_with_relative(st.ctime, tnow)) + end oneline && print(io, ")") return nothing end @@ -168,18 +176,13 @@ show(io::IO, ::MIME"text/plain", st::StatStruct) = show_statstruct(io, st, false # stat & lstat functions +checkstat(s::StatStruct) = Int(s.ioerrno) in (0, Base.UV_ENOENT, Base.UV_ENOTDIR, Base.UV_EINVAL) ? s : uv_error(string("stat(", repr(s.desc), ")"), s.ioerrno) + macro stat_call(sym, arg1type, arg) return quote - stat_buf = zeros(UInt8, Int(ccall(:jl_sizeof_stat, Int32, ()))) + stat_buf = fill!(Memory{UInt8}(undef, Int(ccall(:jl_sizeof_stat, Int32, ()))), 0x00) r = ccall($(Expr(:quote, sym)), Int32, ($(esc(arg1type)), Ptr{UInt8}), $(esc(arg)), stat_buf) - if !(r in (0, Base.UV_ENOENT, Base.UV_ENOTDIR, Base.UV_EINVAL)) - uv_error(string("stat(", repr($(esc(arg))), ")"), r) - end - st = StatStruct($(esc(arg)), stat_buf) - if ispath(st) != (r == 0) - error("stat returned zero type for a valid path") - end - return st + return checkstat(StatStruct($(esc(arg)), stat_buf, r)) end end @@ -334,7 +337,7 @@ Return `true` if a valid filesystem entity exists at `path`, otherwise returns `false`. This is the generalization of [`isfile`](@ref), [`isdir`](@ref) etc. """ -ispath(st::StatStruct) = filemode(st) & 0xf000 != 0x0000 +ispath(st::StatStruct) = st.ioerrno == 0 function ispath(path::String) # We use `access()` and `F_OK` to determine if a given path exists. `F_OK` comes from `unistd.h`. F_OK = 0x00 diff --git a/src/sys.c b/src/sys.c index b54edc32b32b6..fa9054bb93e9a 100644 --- a/src/sys.c +++ b/src/sys.c @@ -102,7 +102,6 @@ JL_DLLEXPORT int32_t jl_nb_available(ios_t *s) // --- dir/file stuff --- -JL_DLLEXPORT int jl_sizeof_uv_fs_t(void) { return sizeof(uv_fs_t); } JL_DLLEXPORT char *jl_uv_fs_t_ptr(uv_fs_t *req) { return (char*)req->ptr; } JL_DLLEXPORT char *jl_uv_fs_t_path(uv_fs_t *req) { return (char*)req->path; } diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index 0c987ad01c828..4ea6fcedd59bb 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -22,11 +22,11 @@ export trymkpidlock import Base: @handle_as, wait, close, eventloop, notify_error, IOError, - _sizeof_uv_poll, _sizeof_uv_fs_poll, _sizeof_uv_fs_event, _uv_hook_close, uv_error, _UVError, - iolock_begin, iolock_end, associate_julia_struct, disassociate_julia_struct, - preserve_handle, unpreserve_handle, isreadable, iswritable, isopen, - |, getproperty, propertynames -import Base.Filesystem.StatStruct + uv_req_data, uv_req_set_data, associate_julia_struct, disassociate_julia_struct, + _sizeof_uv_poll, _sizeof_uv_fs, _sizeof_uv_fs_event, _uv_hook_close, uv_error, _UVError, + iolock_begin, iolock_end, preserve_handle, unpreserve_handle, + isreadable, iswritable, isopen, |, getproperty, propertynames +import Base.Filesystem: StatStruct, uv_fs_req_cleanup if Sys.iswindows() import Base.WindowsRawSocket end @@ -126,31 +126,30 @@ mutable struct FolderMonitor end end +# this is similar to uv_fs_poll, but strives to avoid the design mistakes that make it unsuitable for any usable purpose +# https://github.com/libuv/libuv/issues/4543 mutable struct PollingFileWatcher - @atomic handle::Ptr{Cvoid} file::String - interval::UInt32 - notify::Base.ThreadSynchronizer - active::Bool - curr_error::Int32 - curr_stat::StatStruct + interval::Float64 + const notify::Base.ThreadSynchronizer # lock protects all fields which can be changed (including interval and file, if you really must) + timer::Union{Nothing,Timer} + const stat_req::Memory{UInt8} + active::Bool # whether there is already an uv_fspollcb in-flight, so to speak + closed::Bool # whether the user has explicitly destroyed this + ioerrno::Int32 # the stat errno as of the last result + prev_stat::StatStruct # the stat as of the last successful result PollingFileWatcher(file::AbstractString, interval::Float64=5.007) = PollingFileWatcher(String(file), interval) function PollingFileWatcher(file::String, interval::Float64=5.007) # same default as nodejs - handle = Libc.malloc(_sizeof_uv_fs_poll) - this = new(handle, file, round(UInt32, interval * 1000), Base.ThreadSynchronizer(), false, 0, StatStruct()) - associate_julia_struct(handle, this) - iolock_begin() - err = ccall(:uv_fs_poll_init, Int32, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) - if err != 0 - Libc.free(handle) - throw(_UVError("PollingFileWatcher", err)) - end - finalizer(uvfinalize, this) - iolock_end() + stat_req = Memory{UInt8}(undef, Int(_sizeof_uv_fs)) + this = new(file, interval, Base.ThreadSynchronizer(), nothing, stat_req, false, false, 0, StatStruct()) + uv_req_set_data(stat_req, this) + wait(this) # initialize with the current stat before return return this end end +Base.stat(pfw::PollingFileWatcher) = Base.checkstat(@lock pfw.notify pfw.prev_stat) + mutable struct _FDWatcher @atomic handle::Ptr{Cvoid} fdnum::Int # this is NOT the file descriptor @@ -327,7 +326,7 @@ function close(t::FDWatcher) close(t.watcher, mask) end -function uvfinalize(uv::Union{FileMonitor, FolderMonitor, PollingFileWatcher}) +function uvfinalize(uv::Union{FileMonitor, FolderMonitor}) iolock_begin() if uv.handle != C_NULL disassociate_julia_struct(uv) # close (and free) without notify @@ -336,7 +335,7 @@ function uvfinalize(uv::Union{FileMonitor, FolderMonitor, PollingFileWatcher}) iolock_end() end -function close(t::Union{FileMonitor, FolderMonitor, PollingFileWatcher}) +function close(t::Union{FileMonitor, FolderMonitor}) iolock_begin() if t.handle != C_NULL ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle) @@ -344,6 +343,21 @@ function close(t::Union{FileMonitor, FolderMonitor, PollingFileWatcher}) iolock_end() end +function close(pfw::PollingFileWatcher) + timer = nothing + lock(pfw.notify) + try + pfw.closed = true + notify(pfw.notify, false) + timer = pfw.timer + pfw.timer = nothing + finally + unlock(pfw.notify) + end + timer === nothing || close(timer) + nothing +end + function _uv_hook_close(uv::_FDWatcher) # fyi: jl_atexit_hook can cause this to get called too Libc.free(@atomicswap :monotonic uv.handle = C_NULL) @@ -351,18 +365,6 @@ function _uv_hook_close(uv::_FDWatcher) nothing end -function _uv_hook_close(uv::PollingFileWatcher) - lock(uv.notify) - try - uv.active = false - Libc.free(@atomicswap :monotonic uv.handle = C_NULL) - notify(uv.notify, StatStruct()) - finally - unlock(uv.notify) - end - nothing -end - function _uv_hook_close(uv::FileMonitor) lock(uv.notify) try @@ -388,7 +390,7 @@ end isopen(fm::FileMonitor) = fm.handle != C_NULL isopen(fm::FolderMonitor) = fm.handle != C_NULL -isopen(pfw::PollingFileWatcher) = pfw.handle != C_NULL +isopen(pfw::PollingFileWatcher) = !pfw.closed isopen(pfw::_FDWatcher) = pfw.refcount != (0, 0) isopen(pfw::FDWatcher) = !pfw.mask.timedout @@ -449,21 +451,50 @@ function uv_pollcb(handle::Ptr{Cvoid}, status::Int32, events::Int32) nothing end -function uv_fspollcb(handle::Ptr{Cvoid}, status::Int32, prev::Ptr, curr::Ptr) - t = @handle_as handle PollingFileWatcher - old_status = t.curr_error - t.curr_error = status - if status == 0 - t.curr_stat = StatStruct(convert(Ptr{UInt8}, curr)) - end - if status == 0 || status != old_status - prev_stat = StatStruct(convert(Ptr{UInt8}, prev)) - lock(t.notify) - try - notify(t.notify, prev_stat) - finally - unlock(t.notify) +function uv_fspollcb(req::Ptr{Cvoid}) + pfw = unsafe_pointer_to_objref(uv_req_data(req))::PollingFileWatcher + pfw.active = false + unpreserve_handle(pfw) + @assert pointer(pfw.stat_req) == req + r = Int32(ccall(:uv_fs_get_result, Cssize_t, (Ptr{Cvoid},), req)) + statbuf = ccall(:uv_fs_get_statbuf, Ptr{UInt8}, (Ptr{Cvoid},), req) + curr_stat = StatStruct(pfw.file, statbuf, r) + uv_fs_req_cleanup(req) + lock(pfw.notify) + try + if !isempty(pfw.notify) # discard the update if nobody watching + if pfw.ioerrno != r || (r == 0 && pfw.prev_stat != curr_stat) + if r == 0 + pfw.prev_stat = curr_stat + end + pfw.ioerrno = r + notify(pfw.notify, true) + end + pfw.timer = Timer(pfw.interval) do t + # async task + iolock_begin() + lock(pfw.notify) + try + if pfw.timer === t # use identity check to test if this callback is stale by the time we got the lock + pfw.timer = nothing + @assert !pfw.active + if isopen(pfw) && !isempty(pfw.notify) + preserve_handle(pfw) + err = ccall(:uv_fs_stat, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), + eventloop(), pfw.stat_req, pfw.file, uv_jl_fspollcb) + err == 0 || notify(pfw.notify, _UVError("PollingFileWatcher (start)", err), error=true) # likely just ENOMEM + pfw.active = true + end + end + finally + unlock(pfw.notify) + end + iolock_end() + nothing + end end + finally + unlock(pfw.notify) end nothing end @@ -475,7 +506,7 @@ global uv_jl_fseventscb_folder::Ptr{Cvoid} function __init__() global uv_jl_pollcb = @cfunction(uv_pollcb, Cvoid, (Ptr{Cvoid}, Cint, Cint)) - global uv_jl_fspollcb = @cfunction(uv_fspollcb, Cvoid, (Ptr{Cvoid}, Cint, Ptr{Cvoid}, Ptr{Cvoid})) + global uv_jl_fspollcb = @cfunction(uv_fspollcb, Cvoid, (Ptr{Cvoid},)) global uv_jl_fseventscb_file = @cfunction(uv_fseventscb_file, Cvoid, (Ptr{Cvoid}, Ptr{Int8}, Int32, Int32)) global uv_jl_fseventscb_folder = @cfunction(uv_fseventscb_folder, Cvoid, (Ptr{Cvoid}, Ptr{Int8}, Int32, Int32)) @@ -504,35 +535,6 @@ function start_watching(t::_FDWatcher) nothing end -function start_watching(t::PollingFileWatcher) - iolock_begin() - t.handle == C_NULL && throw(ArgumentError("PollingFileWatcher is closed")) - if !t.active - uv_error("PollingFileWatcher (start)", - ccall(:uv_fs_poll_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, UInt32), - t.handle, uv_jl_fspollcb::Ptr{Cvoid}, t.file, t.interval)) - t.active = true - end - iolock_end() - nothing -end - -function stop_watching(t::PollingFileWatcher) - iolock_begin() - lock(t.notify) - try - if t.active && isempty(t.notify) - t.active = false - uv_error("PollingFileWatcher (stop)", - ccall(:uv_fs_poll_stop, Int32, (Ptr{Cvoid},), t.handle)) - end - finally - unlock(t.notify) - end - iolock_end() - nothing -end - function start_watching(t::FileMonitor) iolock_begin() t.handle == C_NULL && throw(ArgumentError("FileMonitor is closed")) @@ -640,28 +642,65 @@ end function wait(pfw::PollingFileWatcher) iolock_begin() - preserve_handle(pfw) lock(pfw.notify) - local prevstat + prevstat = pfw.prev_stat + havechange = false + timer = nothing try - start_watching(pfw) + # we aren't too strict about the first interval after `wait`, but rather always + # check right away to see if it had immediately changed again, and then repeatedly + # after interval again until success + pfw.closed && throw(ArgumentError("PollingFileWatcher is closed")) + timer = pfw.timer + pfw.timer = nothing # disable Timer callback + # start_watching + if !pfw.active + preserve_handle(pfw) + err = ccall(:uv_fs_stat, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), + eventloop(), pfw.stat_req, pfw.file, uv_jl_fspollcb) + err == 0 || uv_error("PollingFileWatcher (start)", err) # likely just ENOMEM + pfw.active = true + end iolock_end() - prevstat = wait(pfw.notify)::StatStruct + havechange = wait(pfw.notify)::Bool unlock(pfw.notify) iolock_begin() - lock(pfw.notify) - finally - unlock(pfw.notify) - unpreserve_handle(pfw) + catch + # stop_watching: cleanup any timers from before or after starting this wait before it failed, if there are no other watchers + latetimer = nothing + try + if isempty(pfw.notify) + latetimer = pfw.timer + pfw.timer = nothing + end + finally + unlock(pfw.notify) + end + if timer !== nothing || latetimer !== nothing + iolock_end() + timer === nothing || close(timer) + latetimer === nothing || close(latetimer) + iolock_begin() + end + rethrow() end - stop_watching(pfw) iolock_end() - if pfw.handle == C_NULL + timer === nothing || close(timer) # cleanup resources so we don't hang on exit + if !havechange # user canceled by calling close return prevstat, EOFError() - elseif pfw.curr_error != 0 - return prevstat, _UVError("PollingFileWatcher", pfw.curr_error) + end + # grab the most up-to-date stat result as of this time, even if it was a bit newer than the notify call + lock(pfw.notify) + currstat = pfw.prev_stat + ioerrno = pfw.ioerrno + unlock(pfw.notify) + if ioerrno == 0 + @assert currstat.ioerrno == 0 + return prevstat, currstat + elseif ioerrno in (Base.UV_ENOENT, Base.UV_ENOTDIR, Base.UV_EINVAL) + return prevstat, StatStruct(pfw.file, Ptr{UInt8}(0), ioerrno) else - return prevstat, pfw.curr_stat + return prevstat, _UVError("PollingFileWatcher", ioerrno) end end @@ -880,9 +919,9 @@ The `previous` status is always a `StatStruct`, but it may have all of the field The `current` status object may be a `StatStruct`, an `EOFError` (indicating the timeout elapsed), or some other `Exception` subtype (if the `stat` operation failed - for example, if the path does not exist). -To determine when a file was modified, compare `current isa StatStruct && mtime(prev) != mtime(current)` to detect -notification of changes. However, using [`watch_file`](@ref) for this operation is preferred, since -it is more reliable and efficient, although in some situations it may not be available. +To determine when a file was modified, compare `!(current isa StatStruct && prev == current)` to detect +notification of changes to the mtime or inode. However, using [`watch_file`](@ref) for this operation +is preferred, since it is more reliable and efficient, although in some situations it may not be available. """ function poll_file(s::AbstractString, interval_seconds::Real=5.007, timeout_s::Real=-1) pfw = PollingFileWatcher(s, Float64(interval_seconds)) @@ -893,12 +932,7 @@ function poll_file(s::AbstractString, interval_seconds::Real=5.007, timeout_s::R close(pfw) end end - statdiff = wait(pfw) - if isa(statdiff[2], IOError) - # file didn't initially exist, continue watching for it to be created (or the error to change) - statdiff = wait(pfw) - end - return statdiff + return wait(pfw) finally close(pfw) @isdefined(timer) && close(timer) diff --git a/stdlib/FileWatching/test/runtests.jl b/stdlib/FileWatching/test/runtests.jl index 2592aea024386..c9d7a4317fd08 100644 --- a/stdlib/FileWatching/test/runtests.jl +++ b/stdlib/FileWatching/test/runtests.jl @@ -2,6 +2,7 @@ using Test, FileWatching using Base: uv_error, Experimental +using Base.Filesystem: StatStruct @testset "FileWatching" begin @@ -218,7 +219,7 @@ function test_timeout(tval) @async test_file_poll(channel, 10, tval) tr = take!(channel) end - @test tr[1] === Base.Filesystem.StatStruct() && tr[2] === EOFError() + @test ispath(tr[1]::StatStruct) && tr[2] === EOFError() @test tval <= t_elapsed end @@ -231,7 +232,7 @@ function test_touch(slval) write(f, "Hello World\n") close(f) tr = take!(channel) - @test ispath(tr[1]) && ispath(tr[2]) + @test ispath(tr[1]::StatStruct) && ispath(tr[2]::StatStruct) fetch(t) end @@ -435,8 +436,8 @@ end @test_throws(Base._UVError("FolderMonitor (start)", Base.UV_ENOENT), watch_folder("____nonexistent_file", 10)) @test(@elapsed( - @test(poll_file("____nonexistent_file", 1, 3.1) === - (Base.Filesystem.StatStruct(), EOFError()))) > 3) + @test(poll_file("____nonexistent_file", 1, 3.1) == + (StatStruct(), EOFError()))) > 3) unwatch_folder(dir) @test isempty(FileWatching.watched_folders) diff --git a/test/file.jl b/test/file.jl index de258c92e02bc..a4262c4eaaa21 100644 --- a/test/file.jl +++ b/test/file.jl @@ -2128,6 +2128,16 @@ Base.joinpath(x::URI50890) = URI50890(x.f) @test !isnothing(Base.Filesystem.getusername(s.uid)) @test !isnothing(Base.Filesystem.getgroupname(s.gid)) end + s = Base.Filesystem.StatStruct() + stat_show_str = sprint(show, s) + stat_show_str_multi = sprint(show, MIME("text/plain"), s) + @test startswith(stat_show_str, "StatStruct(\"\" ENOENT: ") && endswith(stat_show_str, ")") + @test startswith(stat_show_str_multi, "StatStruct for \"\"\n ENOENT: ") && !endswith(stat_show_str_multi, r"\s") + s = Base.Filesystem.StatStruct("my/test", Ptr{UInt8}(0), Int32(Base.UV_ENOTDIR)) + stat_show_str = sprint(show, s) + stat_show_str_multi = sprint(show, MIME("text/plain"), s) + @test startswith(stat_show_str, "StatStruct(\"my/test\" ENOTDIR: ") && endswith(stat_show_str, ")") + @test startswith(stat_show_str_multi, "StatStruct for \"my/test\"\n ENOTDIR: ") && !endswith(stat_show_str_multi, r"\s") end @testset "diskstat() works" begin From b6e0136466396bc781406c0ab2f036f64cc818d7 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 26 Sep 2024 13:57:52 -0400 Subject: [PATCH 2/3] [FileWatching] fix FileMonitor similarly and improve pidfile reliability Previously pidfile used the same poll_interval as sleep to detect if this code made any concurrency mistakes, but we do not really need to do that once FileMonitor is fixed to be reliable in the presence of parallel concurrency (instead of using watch_file). --- stdlib/FileWatching/src/FileWatching.jl | 108 ++++++++++-------------- stdlib/FileWatching/src/pidfile.jl | 46 +++++++--- stdlib/FileWatching/test/runtests.jl | 11 +-- 3 files changed, 84 insertions(+), 81 deletions(-) diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index 4ea6fcedd59bb..b24f352943ec5 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -38,13 +38,13 @@ const UV_CHANGE = Int32(2) struct FileEvent renamed::Bool changed::Bool - timedout::Bool + timedout::Bool # aka canceled FileEvent(r::Bool, c::Bool, t::Bool) = new(r, c, t) end FileEvent() = FileEvent(false, false, true) FileEvent(flags::Integer) = FileEvent((flags & UV_RENAME) != 0, (flags & UV_CHANGE) != 0, - false) + iszero(flags)) |(a::FileEvent, b::FileEvent) = FileEvent(a.renamed | b.renamed, a.changed | b.changed, @@ -80,23 +80,26 @@ iswritable(f::FDEvent) = f.writable mutable struct FileMonitor @atomic handle::Ptr{Cvoid} - file::String - notify::Base.ThreadSynchronizer - events::Int32 - active::Bool + const file::String + const notify::Base.ThreadSynchronizer + events::Int32 # accumulator for events that occurred since the last wait call, similar to Event with autoreset + ioerrno::Int32 # record the error, if any occurs (unlikely) FileMonitor(file::AbstractString) = FileMonitor(String(file)) function FileMonitor(file::String) handle = Libc.malloc(_sizeof_uv_fs_event) - this = new(handle, file, Base.ThreadSynchronizer(), 0, false) + this = new(handle, file, Base.ThreadSynchronizer(), 0, 0) associate_julia_struct(handle, this) iolock_begin() err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) if err != 0 Libc.free(handle) - throw(_UVError("FileMonitor", err)) + uv_error("FileMonitor", err) end - iolock_end() finalizer(uvfinalize, this) + uv_error("FileMonitor (start)", + ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), + this.handle, uv_jl_fseventscb_file::Ptr{Cvoid}, file, 0)) + iolock_end() return this end end @@ -104,8 +107,8 @@ end mutable struct FolderMonitor @atomic handle::Ptr{Cvoid} # notify::Channel{Any} # eltype = Union{Pair{String, FileEvent}, IOError} - notify::Base.ThreadSynchronizer - channel::Vector{Any} # eltype = Pair{String, FileEvent} + const notify::Base.ThreadSynchronizer + const channel::Vector{Any} # eltype = Pair{String, FileEvent} FolderMonitor(folder::AbstractString) = FolderMonitor(String(folder)) function FolderMonitor(folder::String) handle = Libc.malloc(_sizeof_uv_fs_event) @@ -152,9 +155,9 @@ Base.stat(pfw::PollingFileWatcher) = Base.checkstat(@lock pfw.notify pfw.prev_st mutable struct _FDWatcher @atomic handle::Ptr{Cvoid} - fdnum::Int # this is NOT the file descriptor + const fdnum::Int # this is NOT the file descriptor refcount::Tuple{Int, Int} - notify::Base.ThreadSynchronizer + const notify::Base.ThreadSynchronizer events::Int32 active::Tuple{Bool, Bool} @@ -275,7 +278,7 @@ end mutable struct FDWatcher # WARNING: make sure `close` has been manually called on this watcher before closing / destroying `fd` - watcher::_FDWatcher + const watcher::_FDWatcher mask::FDEvent function FDWatcher(fd::RawFD, readable::Bool, writable::Bool) return FDWatcher(fd, FDEvent(readable, writable, false, false)) @@ -368,9 +371,8 @@ end function _uv_hook_close(uv::FileMonitor) lock(uv.notify) try - uv.active = false Libc.free(@atomicswap :monotonic uv.handle = C_NULL) - notify(uv.notify, FileEvent()) + notify(uv.notify) finally unlock(uv.notify) end @@ -399,10 +401,12 @@ function uv_fseventscb_file(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, st lock(t.notify) try if status != 0 + t.ioerrno = status notify_error(t.notify, _UVError("FileMonitor", status)) - else - t.events |= events - notify(t.notify, FileEvent(events)) + uvfinalize(t) + elseif events != t.events + events = t.events |= events + notify(t.notify, all=false) end finally unlock(t.notify) @@ -535,35 +539,6 @@ function start_watching(t::_FDWatcher) nothing end -function start_watching(t::FileMonitor) - iolock_begin() - t.handle == C_NULL && throw(ArgumentError("FileMonitor is closed")) - if !t.active - uv_error("FileMonitor (start)", - ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), - t.handle, uv_jl_fseventscb_file::Ptr{Cvoid}, t.file, 0)) - t.active = true - end - iolock_end() - nothing -end - -function stop_watching(t::FileMonitor) - iolock_begin() - lock(t.notify) - try - if t.active && isempty(t.notify) - t.active = false - uv_error("FileMonitor (stop)", - ccall(:uv_fs_event_stop, Int32, (Ptr{Cvoid},), t.handle)) - end - finally - unlock(t.notify) - end - iolock_end() - nothing -end - # n.b. this _wait may return spuriously early with a timedout event function _wait(fdw::_FDWatcher, mask::FDEvent) iolock_begin() @@ -705,26 +680,23 @@ function wait(pfw::PollingFileWatcher) end function wait(m::FileMonitor) - iolock_begin() + m.handle == C_NULL && throw(EOFError()) preserve_handle(m) lock(m.notify) - local events try - start_watching(m) - iolock_end() - events = wait(m.notify)::FileEvent - events |= FileEvent(m.events) - m.events = 0 - unlock(m.notify) - iolock_begin() - lock(m.notify) + while true + m.handle == C_NULL && throw(EOFError()) + events = @atomicswap :not_atomic m.events = 0 + events == 0 || return FileEvent(events) + if m.ioerrno != 0 + uv_error("FileMonitor", m.ioerrno) + end + wait(m.notify) + end finally unlock(m.notify) unpreserve_handle(m) end - stop_watching(m) - iolock_end() - return events end function wait(m::FolderMonitor) @@ -743,6 +715,7 @@ function wait(m::FolderMonitor) end return evt::Pair{String, FileEvent} end +Base.take!(m::FolderMonitor) = wait(m) # Channel-like API """ @@ -823,7 +796,12 @@ function watch_file(s::String, timeout_s::Float64=-1.0) close(fm) end end - return wait(fm) + try + return wait(fm) + catch ex + ex isa EOFError && return FileEvent() + rethrow() + end finally close(fm) @isdefined(timer) && close(timer) @@ -851,7 +829,7 @@ This behavior of this function varies slightly across platforms. See """ watch_folder(s::AbstractString, timeout_s::Real=-1) = watch_folder(String(s), timeout_s) function watch_folder(s::String, timeout_s::Real=-1) - fm = get!(watched_folders, s) do + fm = @lock watched_folders get!(watched_folders[], s) do return FolderMonitor(s) end local timer @@ -898,12 +876,12 @@ It is not recommended to do this while another task is waiting for """ unwatch_folder(s::AbstractString) = unwatch_folder(String(s)) function unwatch_folder(s::String) - fm = pop!(watched_folders, s, nothing) + fm = @lock watched_folders pop!(watched_folders[], s, nothing) fm === nothing || close(fm) nothing end -const watched_folders = Dict{String, FolderMonitor}() +const watched_folders = Lockable(Dict{String, FolderMonitor}()) """ poll_file(path::AbstractString, interval_s::Real=5.007, timeout_s::Real=-1) -> (previous::StatStruct, current) diff --git a/stdlib/FileWatching/src/pidfile.jl b/stdlib/FileWatching/src/pidfile.jl index 4c821a3d897e4..95b8f20face29 100644 --- a/stdlib/FileWatching/src/pidfile.jl +++ b/stdlib/FileWatching/src/pidfile.jl @@ -4,14 +4,14 @@ module Pidfile export mkpidlock, trymkpidlock using Base: - IOError, UV_EEXIST, UV_ESRCH, + IOError, UV_EEXIST, UV_ESRCH, UV_ENOENT, Process using Base.Filesystem: File, open, JL_O_CREAT, JL_O_RDWR, JL_O_RDONLY, JL_O_EXCL, rename, samefile, path_separator -using ..FileWatching: watch_file +using ..FileWatching: FileMonitor using Base.Sys: iswindows """ @@ -256,19 +256,43 @@ function open_exclusive(path::String; end end # fall-back: wait for the lock - + watch = Lockable(Core.Box(nothing)) while true - # start the file-watcher prior to checking for the pidfile existence - t = @async try - watch_file(path, poll_interval) + # now try again to create it + # try to start the file-watcher prior to checking for the pidfile existence + watch = try + FileMonitor(path) catch ex isa(ex, IOError) || rethrow(ex) - sleep(poll_interval) # if the watch failed, convert to just doing a sleep + ex.code != UV_ENOENT # if the file was deleted in the meantime, don't sleep at all, even if the lock fails + end + timeout = nothing + if watch isa FileMonitor && stale_age > 0 + let watch = watch + timeout = Timer(stale_age) do t + close(watch) + end + end + end + try + file = tryopen_exclusive(path, mode) + file === nothing || return file + if watch isa FileMonitor + try + Base.wait(watch) # will time-out after stale_age passes + catch ex + isa(ex, EOFError) || isa(ex, IOError) || rethrow(ex) + end + end + if watch === true # if the watch failed, convert to just doing a sleep + sleep(poll_interval) + end + finally + # something changed about the path, so watch is now possibly monitoring the wrong file handle + # it will need to be recreated just before the next tryopen_exclusive attempt + timeout isa Timer && close(timeout) + watch isa FileMonitor && close(watch) end - # now try again to create it - file = tryopen_exclusive(path, mode) - file === nothing || return file - Base.wait(t) # sleep for a bit before trying again if stale_age > 0 && stale_pidfile(path, stale_age, refresh) # if the file seems stale, try to remove it before attempting again # set stale_age to zero so we won't attempt again, even if the attempt fails diff --git a/stdlib/FileWatching/test/runtests.jl b/stdlib/FileWatching/test/runtests.jl index c9d7a4317fd08..11df8849048f8 100644 --- a/stdlib/FileWatching/test/runtests.jl +++ b/stdlib/FileWatching/test/runtests.jl @@ -169,12 +169,13 @@ file = joinpath(dir, "afile.txt") # initialize a watch_folder instance and create afile.txt function test_init_afile() - @test isempty(FileWatching.watched_folders) + watched_folders = FileWatching.watched_folders + @test @lock watched_folders isempty(watched_folders[]) @test(watch_folder(dir, 0) == ("" => FileWatching.FileEvent())) @test @elapsed(@test(watch_folder(dir, 0) == ("" => FileWatching.FileEvent()))) <= 0.5 - @test length(FileWatching.watched_folders) == 1 + @test @lock(watched_folders, length(FileWatching.watched_folders[])) == 1 @test unwatch_folder(dir) === nothing - @test isempty(FileWatching.watched_folders) + @test @lock watched_folders isempty(watched_folders[]) @test 0.002 <= @elapsed(@test(watch_folder(dir, 0.004) == ("" => FileWatching.FileEvent()))) @test 0.002 <= @elapsed(@test(watch_folder(dir, 0.004) == ("" => FileWatching.FileEvent()))) <= 0.5 @test unwatch_folder(dir) === nothing @@ -204,7 +205,7 @@ function test_init_afile() @test unwatch_folder(dir) === nothing @test(watch_folder(dir, 0) == ("" => FileWatching.FileEvent())) @test 0.9 <= @elapsed(@test(watch_folder(dir, 1) == ("" => FileWatching.FileEvent()))) - @test length(FileWatching.watched_folders) == 1 + @test @lock(watched_folders, length(FileWatching.watched_folders[])) == 1 nothing end @@ -440,7 +441,7 @@ end (StatStruct(), EOFError()))) > 3) unwatch_folder(dir) -@test isempty(FileWatching.watched_folders) +@test @lock FileWatching.watched_folders isempty(FileWatching.watched_folders[]) rm(file) rm(dir) From f8d17e7ad4857ba3164ca1c4df8d118dbf42b429 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 26 Sep 2024 15:04:26 -0400 Subject: [PATCH 3/3] [FileWatching] reorganize file and add docs --- stdlib/FileWatching/docs/src/index.md | 16 +- stdlib/FileWatching/src/FileWatching.jl | 386 +++++++++++++++--------- stdlib/FileWatching/test/runtests.jl | 6 +- 3 files changed, 248 insertions(+), 160 deletions(-) diff --git a/stdlib/FileWatching/docs/src/index.md b/stdlib/FileWatching/docs/src/index.md index 1b2212fcc5a28..15d4e39a45117 100644 --- a/stdlib/FileWatching/docs/src/index.md +++ b/stdlib/FileWatching/docs/src/index.md @@ -5,11 +5,17 @@ EditURL = "https://github.com/JuliaLang/julia/blob/master/stdlib/FileWatching/do # [File Events](@id lib-filewatching) ```@docs -FileWatching.poll_fd -FileWatching.poll_file -FileWatching.watch_file -FileWatching.watch_folder -FileWatching.unwatch_folder +poll_fd +poll_file +watch_file +watch_folder +unwatch_folder +``` +```@docs +FileMonitor +FolderMonitor +PollingFileWatcher +FDWatcher ``` # Pidfile diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index b24f352943ec5..7c743ce634193 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -6,7 +6,7 @@ Utilities for monitoring files and file descriptors for events. module FileWatching export - # one-shot API (returns results): + # one-shot API (returns results, race-y): watch_file, # efficient for small numbers of files watch_folder, # efficient for large numbers of files unwatch_folder, @@ -78,6 +78,134 @@ isreadable(f::FDEvent) = f.readable iswritable(f::FDEvent) = f.writable |(a::FDEvent, b::FDEvent) = FDEvent(getfield(a, :events) | getfield(b, :events)) +# Callback functions + +function uv_fseventscb_file(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32) + t = @handle_as handle FileMonitor + lock(t.notify) + try + if status != 0 + t.ioerrno = status + notify_error(t.notify, _UVError("FileMonitor", status)) + uvfinalize(t) + elseif events != t.events + events = t.events |= events + notify(t.notify, all=false) + end + finally + unlock(t.notify) + end + nothing +end + +function uv_fseventscb_folder(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32) + t = @handle_as handle FolderMonitor + lock(t.notify) + try + if status != 0 + notify_error(t.notify, _UVError("FolderMonitor", status)) + else + fname = (filename == C_NULL) ? "" : unsafe_string(convert(Cstring, filename)) + push!(t.channel, fname => FileEvent(events)) + notify(t.notify) + end + finally + unlock(t.notify) + end + nothing +end + +function uv_pollcb(handle::Ptr{Cvoid}, status::Int32, events::Int32) + t = @handle_as handle _FDWatcher + lock(t.notify) + try + if status != 0 + notify_error(t.notify, _UVError("FDWatcher", status)) + else + t.events |= events + if t.active[1] || t.active[2] + if isempty(t.notify) + # if we keep hearing about events when nobody appears to be listening, + # stop the poll to save cycles + t.active = (false, false) + ccall(:uv_poll_stop, Int32, (Ptr{Cvoid},), t.handle) + end + end + notify(t.notify, events) + end + finally + unlock(t.notify) + end + nothing +end + +function uv_fspollcb(req::Ptr{Cvoid}) + pfw = unsafe_pointer_to_objref(uv_req_data(req))::PollingFileWatcher + pfw.active = false + unpreserve_handle(pfw) + @assert pointer(pfw.stat_req) == req + r = Int32(ccall(:uv_fs_get_result, Cssize_t, (Ptr{Cvoid},), req)) + statbuf = ccall(:uv_fs_get_statbuf, Ptr{UInt8}, (Ptr{Cvoid},), req) + curr_stat = StatStruct(pfw.file, statbuf, r) + uv_fs_req_cleanup(req) + lock(pfw.notify) + try + if !isempty(pfw.notify) # must discard the update if nobody watching + if pfw.ioerrno != r || (r == 0 && pfw.prev_stat != curr_stat) + if r == 0 + pfw.prev_stat = curr_stat + end + pfw.ioerrno = r + notify(pfw.notify, true) + end + pfw.timer = Timer(pfw.interval) do t + # async task + iolock_begin() + lock(pfw.notify) + try + if pfw.timer === t # use identity check to test if this callback is stale by the time we got the lock + pfw.timer = nothing + @assert !pfw.active + if isopen(pfw) && !isempty(pfw.notify) + preserve_handle(pfw) + uv_jl_fspollcb = @cfunction(uv_fspollcb, Cvoid, (Ptr{Cvoid},)) + err = ccall(:uv_fs_stat, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), + eventloop(), pfw.stat_req, pfw.file, uv_jl_fspollcb::Ptr{Cvoid}) + err == 0 || notify(pfw.notify, _UVError("PollingFileWatcher (start)", err), error=true) # likely just ENOMEM + pfw.active = true + end + end + finally + unlock(pfw.notify) + end + iolock_end() + nothing + end + end + finally + unlock(pfw.notify) + end + nothing +end + +# Types + +""" + FileMonitor(path::AbstractString) + +Watch file or directory `path` (which must exist) for changes until a change occurs. This +function does not poll the file system and instead uses platform-specific functionality to +receive notifications from the operating system (e.g. via inotify on Linux). See the NodeJS +documentation linked below for details. + +`fm = FileMonitor(path)` acts like an auto-reset Event, so `wait(fm)` blocks until there has +been at least one event in the file originally at the given path and then returns an object +with boolean fields `renamed`, `changed`, `timedout` summarizing all changes that have +occurred since the last call to `wait` returned. + +This behavior of this function varies slightly across platforms. See + for more detailed information. +""" mutable struct FileMonitor @atomic handle::Ptr{Cvoid} const file::String @@ -96,6 +224,7 @@ mutable struct FileMonitor uv_error("FileMonitor", err) end finalizer(uvfinalize, this) + uv_jl_fseventscb_file = @cfunction(uv_fseventscb_file, Cvoid, (Ptr{Cvoid}, Ptr{Int8}, Int32, Int32)) uv_error("FileMonitor (start)", ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), this.handle, uv_jl_fseventscb_file::Ptr{Cvoid}, file, 0)) @@ -104,6 +233,23 @@ mutable struct FileMonitor end end + +""" + FolderMonitor(folder::AbstractString) + +Watch a file or directory `path` for changes until a change has occurred. This function does +not poll the file system and instead uses platform-specific functionality to receive +notifications from the operating system (e.g. via inotify on Linux). See the NodeJS +documentation linked below for details. + +This acts similar to a Channel, so calling `take!` (or `wait`) blocks until some change has +occurred. The `wait` function will return a pair where the first field is the name of the +changed file (if available) and the second field is an object with boolean fields `renamed` +and `changed`, giving the event that occurred on it. + +This behavior of this function varies slightly across platforms. See + for more detailed information. +""" mutable struct FolderMonitor @atomic handle::Ptr{Cvoid} # notify::Channel{Any} # eltype = Union{Pair{String, FileEvent}, IOError} @@ -121,6 +267,7 @@ mutable struct FolderMonitor throw(_UVError("FolderMonitor", err)) end finalizer(uvfinalize, this) + uv_jl_fseventscb_folder = @cfunction(uv_fseventscb_folder, Cvoid, (Ptr{Cvoid}, Ptr{Int8}, Int32, Int32)) uv_error("FolderMonitor (start)", ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), handle, uv_jl_fseventscb_folder::Ptr{Cvoid}, folder, 0)) @@ -131,6 +278,28 @@ end # this is similar to uv_fs_poll, but strives to avoid the design mistakes that make it unsuitable for any usable purpose # https://github.com/libuv/libuv/issues/4543 +""" + PollingFileWatcher(path::AbstractString, interval_s::Real=5.007) + +Monitor a file for changes by polling `stat` every `interval_s` seconds until a change +occurs or `timeout_s` seconds have elapsed. The `interval_s` should be a long period; the +default is 5.007 seconds. Call `stat` on it to get the most recent, but old, result. + +This acts like an auto-reset Event, so calling `wait` blocks until the `stat` result has +changed since the previous value captured upon entry to the `wait` call. The `wait` function +will return a pair of status objects `(previous, current)` once any `stat` change is +detected since the previous time that `wait` was called. The `previous` status is always a +`StatStruct`, but it may have all of the fields zeroed (indicating the file didn't +previously exist, or wasn't previously accessible). + +The `current` status object may be a `StatStruct`, an `EOFError` (if the wait is canceled by +closing this object), or some other `Exception` subtype (if the `stat` operation failed: for +example, if the path is removed). Note that `stat` value may be outdated if the file has +changed again multiple times. + +Using [`FileMonitor`](@ref) for this operation is preferred, since it is more reliable and +efficient, although in some situations it may not be available. +""" mutable struct PollingFileWatcher file::String interval::Float64 @@ -151,8 +320,6 @@ mutable struct PollingFileWatcher end end -Base.stat(pfw::PollingFileWatcher) = Base.checkstat(@lock pfw.notify pfw.prev_stat) - mutable struct _FDWatcher @atomic handle::Ptr{Cvoid} const fdnum::Int # this is NOT the file descriptor @@ -276,6 +443,25 @@ mutable struct _FDWatcher end end +""" + FDWatcher(fd::Union{RawFD,WindowsRawSocket}, readable::Bool, writable::Bool) + +Monitor a file descriptor `fd` for changes in the read or write availability. + +The keyword arguments determine which of read and/or write status should be monitored; at +least one of them must be set to `true`. + +The returned value is an object with boolean fields `readable`, `writable`, and `timedout`, +giving the result of the polling. + +This acts like a level-set event, so calling `wait` blocks until one of those conditions is +met, but then continues to return without blocking until the condition is cleared (either +there is no more to read, or no more space in the write buffer, or both). + +!!! warning + You must call `close` manually, when finished with this object, before the fd + argument is closed. Failure to do so risks serious crashes. +""" mutable struct FDWatcher # WARNING: make sure `close` has been manually called on this watcher before closing / destroying `fd` const watcher::_FDWatcher @@ -396,148 +582,7 @@ isopen(pfw::PollingFileWatcher) = !pfw.closed isopen(pfw::_FDWatcher) = pfw.refcount != (0, 0) isopen(pfw::FDWatcher) = !pfw.mask.timedout -function uv_fseventscb_file(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32) - t = @handle_as handle FileMonitor - lock(t.notify) - try - if status != 0 - t.ioerrno = status - notify_error(t.notify, _UVError("FileMonitor", status)) - uvfinalize(t) - elseif events != t.events - events = t.events |= events - notify(t.notify, all=false) - end - finally - unlock(t.notify) - end - nothing -end - -function uv_fseventscb_folder(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32) - t = @handle_as handle FolderMonitor - lock(t.notify) - try - if status != 0 - notify_error(t.notify, _UVError("FolderMonitor", status)) - else - fname = (filename == C_NULL) ? "" : unsafe_string(convert(Cstring, filename)) - push!(t.channel, fname => FileEvent(events)) - notify(t.notify) - end - finally - unlock(t.notify) - end - nothing -end - -function uv_pollcb(handle::Ptr{Cvoid}, status::Int32, events::Int32) - t = @handle_as handle _FDWatcher - lock(t.notify) - try - if status != 0 - notify_error(t.notify, _UVError("FDWatcher", status)) - else - t.events |= events - if t.active[1] || t.active[2] - if isempty(t.notify) - # if we keep hearing about events when nobody appears to be listening, - # stop the poll to save cycles - t.active = (false, false) - ccall(:uv_poll_stop, Int32, (Ptr{Cvoid},), t.handle) - end - end - notify(t.notify, events) - end - finally - unlock(t.notify) - end - nothing -end - -function uv_fspollcb(req::Ptr{Cvoid}) - pfw = unsafe_pointer_to_objref(uv_req_data(req))::PollingFileWatcher - pfw.active = false - unpreserve_handle(pfw) - @assert pointer(pfw.stat_req) == req - r = Int32(ccall(:uv_fs_get_result, Cssize_t, (Ptr{Cvoid},), req)) - statbuf = ccall(:uv_fs_get_statbuf, Ptr{UInt8}, (Ptr{Cvoid},), req) - curr_stat = StatStruct(pfw.file, statbuf, r) - uv_fs_req_cleanup(req) - lock(pfw.notify) - try - if !isempty(pfw.notify) # discard the update if nobody watching - if pfw.ioerrno != r || (r == 0 && pfw.prev_stat != curr_stat) - if r == 0 - pfw.prev_stat = curr_stat - end - pfw.ioerrno = r - notify(pfw.notify, true) - end - pfw.timer = Timer(pfw.interval) do t - # async task - iolock_begin() - lock(pfw.notify) - try - if pfw.timer === t # use identity check to test if this callback is stale by the time we got the lock - pfw.timer = nothing - @assert !pfw.active - if isopen(pfw) && !isempty(pfw.notify) - preserve_handle(pfw) - err = ccall(:uv_fs_stat, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), - eventloop(), pfw.stat_req, pfw.file, uv_jl_fspollcb) - err == 0 || notify(pfw.notify, _UVError("PollingFileWatcher (start)", err), error=true) # likely just ENOMEM - pfw.active = true - end - end - finally - unlock(pfw.notify) - end - iolock_end() - nothing - end - end - finally - unlock(pfw.notify) - end - nothing -end - -global uv_jl_pollcb::Ptr{Cvoid} -global uv_jl_fspollcb::Ptr{Cvoid} -global uv_jl_fseventscb_file::Ptr{Cvoid} -global uv_jl_fseventscb_folder::Ptr{Cvoid} - -function __init__() - global uv_jl_pollcb = @cfunction(uv_pollcb, Cvoid, (Ptr{Cvoid}, Cint, Cint)) - global uv_jl_fspollcb = @cfunction(uv_fspollcb, Cvoid, (Ptr{Cvoid},)) - global uv_jl_fseventscb_file = @cfunction(uv_fseventscb_file, Cvoid, (Ptr{Cvoid}, Ptr{Int8}, Int32, Int32)) - global uv_jl_fseventscb_folder = @cfunction(uv_fseventscb_folder, Cvoid, (Ptr{Cvoid}, Ptr{Int8}, Int32, Int32)) - - Base.mkpidlock_hook = mkpidlock - Base.trymkpidlock_hook = trymkpidlock - Base.parse_pidfile_hook = Pidfile.parse_pidfile - - nothing -end - -function start_watching(t::_FDWatcher) - iolock_begin() - t.handle == C_NULL && throw(ArgumentError("FDWatcher is closed")) - readable = t.refcount[1] > 0 - writable = t.refcount[2] > 0 - if t.active[1] != readable || t.active[2] != writable - # make sure the READABLE / WRITEABLE state is updated - uv_error("FDWatcher (start)", - ccall(:uv_poll_start, Int32, (Ptr{Cvoid}, Int32, Ptr{Cvoid}), - t.handle, - (readable ? UV_READABLE : 0) | (writable ? UV_WRITABLE : 0), - uv_jl_pollcb::Ptr{Cvoid})) - t.active = (readable, writable) - end - iolock_end() - nothing -end +Base.stat(pfw::PollingFileWatcher) = Base.checkstat(@lock pfw.notify pfw.prev_stat) # n.b. this _wait may return spuriously early with a timedout event function _wait(fdw::_FDWatcher, mask::FDEvent) @@ -549,7 +594,20 @@ function _wait(fdw::_FDWatcher, mask::FDEvent) if !isopen(fdw) # !open throw(EOFError()) elseif events.timedout - start_watching(fdw) # make sure the poll is active + fdw.handle == C_NULL && throw(ArgumentError("FDWatcher is closed")) + # start_watching to make sure the poll is active + readable = fdw.refcount[1] > 0 + writable = fdw.refcount[2] > 0 + if fdw.active[1] != readable || fdw.active[2] != writable + # make sure the READABLE / WRITEABLE state is updated + uv_jl_pollcb = @cfunction(uv_pollcb, Cvoid, (Ptr{Cvoid}, Cint, Cint)) + uv_error("FDWatcher (start)", + ccall(:uv_poll_start, Int32, (Ptr{Cvoid}, Int32, Ptr{Cvoid}), + fdw.handle, + (readable ? UV_READABLE : 0) | (writable ? UV_WRITABLE : 0), + uv_jl_pollcb::Ptr{Cvoid})) + fdw.active = (readable, writable) + end iolock_end() return FDEvent(wait(fdw.notify)::Int32) else @@ -631,8 +689,9 @@ function wait(pfw::PollingFileWatcher) # start_watching if !pfw.active preserve_handle(pfw) + uv_jl_fspollcb = @cfunction(uv_fspollcb, Cvoid, (Ptr{Cvoid},)) err = ccall(:uv_fs_stat, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Ptr{Cvoid}), - eventloop(), pfw.stat_req, pfw.file, uv_jl_fspollcb) + eventloop(), pfw.stat_req, pfw.file, uv_jl_fspollcb::Ptr{Cvoid}) err == 0 || uv_error("PollingFileWatcher (start)", err) # likely just ENOMEM pfw.active = true end @@ -664,7 +723,8 @@ function wait(pfw::PollingFileWatcher) if !havechange # user canceled by calling close return prevstat, EOFError() end - # grab the most up-to-date stat result as of this time, even if it was a bit newer than the notify call + # grab the most up-to-date stat result as of this time, even if it was a bit newer than + # the notify call (unlikely, as there would need to be a concurrent call to wait) lock(pfw.notify) currstat = pfw.prev_stat ioerrno = pfw.ioerrno @@ -729,6 +789,10 @@ least one of them must be set to `true`. The returned value is an object with boolean fields `readable`, `writable`, and `timedout`, giving the result of the polling. + +This is a thin wrapper over calling `wait` on a [`FDWatcher`](@ref), which implements the +functionality but requires the user to call `close` manually when finished with it, or risk +serious crashes. """ function poll_fd(s::Union{RawFD, Sys.iswindows() ? WindowsRawSocket : Union{}}, timeout_s::Real=-1; readable=false, writable=false) mask = FDEvent(readable, writable, false, false) @@ -786,6 +850,15 @@ giving the result of watching the file. This behavior of this function varies slightly across platforms. See for more detailed information. + +This is a thin wrapper over calling `wait` on a [`FileMonitor`](@ref). This function has a +small race window between consecutive calls to `watch_file` where the file might change +without being detected. To avoid this race, use + + fm = FileMonitor(path) + wait(fm) + +directly, re-using the same `fm` each time you `wait`. """ function watch_file(s::String, timeout_s::Float64=-1.0) fm = FileMonitor(s) @@ -812,7 +885,7 @@ watch_file(s::AbstractString, timeout_s::Real=-1) = watch_file(String(s), Float6 """ watch_folder(path::AbstractString, timeout_s::Real=-1) -Watches a file or directory `path` for changes until a change has occurred or `timeout_s` +Watch a file or directory `path` for changes until a change has occurred or `timeout_s` seconds have elapsed. This function does not poll the file system and instead uses platform-specific functionality to receive notifications from the operating system (e.g. via inotify on Linux). See the NodeJS documentation linked below for details. @@ -826,6 +899,8 @@ giving the event. This behavior of this function varies slightly across platforms. See for more detailed information. + +This function is a thin wrapper over calling `wait` on a [`FolderMonitor`](@ref), with added timeout support. """ watch_folder(s::AbstractString, timeout_s::Real=-1) = watch_folder(String(s), timeout_s) function watch_folder(s::String, timeout_s::Real=-1) @@ -895,11 +970,15 @@ The `previous` status is always a `StatStruct`, but it may have all of the field (indicating the file didn't previously exist, or wasn't previously accessible). The `current` status object may be a `StatStruct`, an `EOFError` (indicating the timeout elapsed), -or some other `Exception` subtype (if the `stat` operation failed - for example, if the path does not exist). +or some other `Exception` subtype (if the `stat` operation failed: for example, if the path does not exist). To determine when a file was modified, compare `!(current isa StatStruct && prev == current)` to detect notification of changes to the mtime or inode. However, using [`watch_file`](@ref) for this operation is preferred, since it is more reliable and efficient, although in some situations it may not be available. + +This is a thin wrapper over calling `wait` on a [`PollingFileWatcher`](@ref), which implements +the functionality, but this function has a small race window between consecutive calls to +`poll_file` where the file might change without being detected. """ function poll_file(s::AbstractString, interval_seconds::Real=5.007, timeout_s::Real=-1) pfw = PollingFileWatcher(s, Float64(interval_seconds)) @@ -920,4 +999,11 @@ end include("pidfile.jl") import .Pidfile: mkpidlock, trymkpidlock +function __init__() + Base.mkpidlock_hook = mkpidlock + Base.trymkpidlock_hook = trymkpidlock + Base.parse_pidfile_hook = Pidfile.parse_pidfile + nothing +end + end diff --git a/stdlib/FileWatching/test/runtests.jl b/stdlib/FileWatching/test/runtests.jl index 11df8849048f8..def555154264d 100644 --- a/stdlib/FileWatching/test/runtests.jl +++ b/stdlib/FileWatching/test/runtests.jl @@ -452,10 +452,6 @@ rm(dir) include("pidfile.jl") end -@testset "Docstrings" begin - undoc = Docs.undocumented_names(FileWatching) - @test_broken isempty(undoc) - @test undoc == [:FDWatcher, :FileMonitor, :FolderMonitor, :PollingFileWatcher] -end +@test isempty(Docs.undocumented_names(FileWatching)) end # testset