diff --git a/NEWS.md b/NEWS.md index e35771e0690ba..3ec9360a39bef 100644 --- a/NEWS.md +++ b/NEWS.md @@ -237,6 +237,10 @@ Library improvements * `readavailable` returns a byte vector instead of a string. + * `lock` and `unlock` which operate on `ReentrantLock`. Useful to lock a stream during + concurrent writes from multiple tasks + + Deprecated or removed --------------------- diff --git a/base/exports.jl b/base/exports.jl index c0ee284652ca6..de9a9548cadb9 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1000,10 +1000,13 @@ export current_task, istaskstarted, istaskdone, + lock, notify, produce, + ReentrantLock, schedule, task_local_storage, + unlock, yield, # time diff --git a/base/lock.jl b/base/lock.jl new file mode 100644 index 0000000000000..9d6572626b19a --- /dev/null +++ b/base/lock.jl @@ -0,0 +1,37 @@ +# Advisory reentrant lock +type ReentrantLock + locked_by::Nullable{Task} + cond_wait::Condition + reentrancy_cnt::Int + + ReentrantLock() = new(nothing, Condition(), 0) +end + +function lock(rl::ReentrantLock) + t = current_task() + while true + if rl.reentrancy_cnt == 0 + rl.locked_by = t + rl.reentrancy_cnt = 1 + return + elseif t == get(rl.locked_by) + rl.reentrancy_cnt += 1 + return + end + wait(rl.cond_wait) + end +end + +unlock(o::Any) = unlock(o.lock) + +function unlock(rl::ReentrantLock) + rl.reentrancy_cnt = rl.reentrancy_cnt - 1 + if rl.reentrancy_cnt == 0 + rl.locked_by = nothing + notify(rl.cond_wait) + elseif rl.reentrancy_cnt < 0 + AssertionError("unlock count must match lock count") + end + rl +end + diff --git a/base/multi.jl b/base/multi.jl index 0842196d4757c..74b5e14e48ccc 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -167,7 +167,8 @@ end function send_msg_(w::Worker, kind, args, now::Bool) #println("Sending msg $kind") io = w.w_stream - lock(io) do io + lock(io.lock) + try serialize(io, kind) for arg in args serialize(io, arg) @@ -178,6 +179,8 @@ function send_msg_(w::Worker, kind, args, now::Bool) else flush(io) end + finally + unlock(io.lock) end end diff --git a/base/socket.jl b/base/socket.jl index 039250d009073..8c9ec2b8a1df6 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -259,6 +259,7 @@ type TCPSocket <: Socket closecb::Callback closenotify::Condition sendbuf::Nullable{IOBuffer} + lock::ReentrantLock TCPSocket(handle) = new( handle, @@ -268,7 +269,8 @@ type TCPSocket <: Socket false, Condition(), false, Condition(), false, Condition(), - nothing + nothing, + ReentrantLock() ) end function TCPSocket() diff --git a/base/stream.jl b/base/stream.jl index 4277869a66a08..d45e817adf246 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -101,6 +101,8 @@ type Pipe <: AsyncStream closecb::Callback closenotify::Condition sendbuf::Nullable{IOBuffer} + lock::ReentrantLock + Pipe(handle) = new( handle, StatusUninit, @@ -109,7 +111,7 @@ type Pipe <: AsyncStream false,Condition(), false,Condition(), false,Condition(), - nothing) + nothing, ReentrantLock()) end function Pipe() handle = Libc.malloc(_sizeof_uv_named_pipe) @@ -177,6 +179,7 @@ type TTY <: AsyncStream closecb::Callback closenotify::Condition sendbuf::Nullable{IOBuffer} + lock::ReentrantLock @windows_only ispty::Bool function TTY(handle) tty = new( @@ -186,7 +189,7 @@ type TTY <: AsyncStream PipeBuffer(), false,Condition(), false,Condition(), - nothing) + nothing, ReentrantLock()) @windows_only tty.ispty = Bool(ccall(:jl_ispty, Cint, (Ptr{Void},), handle)) tty end @@ -217,6 +220,16 @@ nb_available(stream::UVStream) = nb_available(stream.buffer) show(io::IO,stream::TTY) = print(io,"TTY(",uv_status_string(stream),", ", nb_available(stream.buffer)," bytes waiting)") +function println(io::AsyncStream, xs...) + lock(io.lock) + try + invoke(println, tuple(IO, typeof(xs)...), io, xs...) + finally + unlock(io.lock) + end +end + + uvtype(::AsyncStream) = UV_STREAM uvhandle(stream::AsyncStream) = stream.handle @@ -957,8 +970,9 @@ type BufferStream <: AsyncStream close_c::Condition is_open::Bool buffer_writes::Bool + lock::ReentrantLock - BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false) + BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock()) end isopen(s::BufferStream) = s.is_open diff --git a/base/string.jl b/base/string.jl index 8d991f1510a2e..61b656724e27d 100644 --- a/base/string.jl +++ b/base/string.jl @@ -1,11 +1,8 @@ ## core text I/O ## print(io::IO, x) = show(io, x) -function print(io::IO, xs...) - lock(io) do io - for x in xs print(io, x) end - end -end +print(io::IO, xs...) = for x in xs print(io, x) end + println(io::IO, xs...) = print(io, xs..., '\n') print(xs...) = print(STDOUT, xs...) diff --git a/base/sysimg.jl b/base/sysimg.jl index 4a6f283f9c508..8554dd9043d8f 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -113,6 +113,7 @@ include("nullable.jl") # I/O include("task.jl") +include("lock.jl") include("show.jl") include("stream.jl") include("socket.jl") diff --git a/base/util.jl b/base/util.jl index 2628aff4b94ff..08451e650596f 100644 --- a/base/util.jl +++ b/base/util.jl @@ -268,60 +268,3 @@ end julia_exename() = ccall(:jl_is_debugbuild,Cint,())==0 ? "julia" : "julia-debug" -# Advisory reentrant lock -type ReentrantLock - locked_by::Nullable{Task} - cond_wait::Condition - reentrancy_cnt::Int - - ReentrantLock() = new(nothing, Condition(), 0) -end - -# Lock object during function execution. Recursive calls by the same task is OK. -const adv_locks_map = WeakKeyDict{Any, ReentrantLock}() -function lock(f::Function, o::Any) - rl = get(adv_locks_map, o, nothing) - if rl == nothing - rl = ReentrantLock() - adv_locks_map[o] = rl - end - lock(rl) - - try - f(o) - finally - unlock(o) - end -end - -function lock(rl::ReentrantLock) - t = current_task() - while true - if rl.reentrancy_cnt == 0 - rl.locked_by = t - rl.reentrancy_cnt = 1 - return - elseif t == get(rl.locked_by) - rl.reentrancy_cnt += 1 - return - end - wait(rl.cond_wait) - end -end - -function unlock(o::Any) - rl = adv_locks_map[o] - unlock(rl) -end - -function unlock(rl::ReentrantLock) - rl.reentrancy_cnt = rl.reentrancy_cnt - 1 - if rl.reentrancy_cnt == 0 - rl.locked_by = nothing - notify(rl.cond_wait) - elseif rl.reentrancy_cnt < 0 - AssertionError("unlock count must match lock count") - end - rl -end - diff --git a/doc/manual/faq.rst b/doc/manual/faq.rst index 87e5196c1a6d6..efba783b285eb 100644 --- a/doc/manual/faq.rst +++ b/doc/manual/faq.rst @@ -833,6 +833,50 @@ potential performance optimizations that can be achieved by other means (e.g., using explicit loops), operators like ``+=`` and ``*=`` work by rebinding new values. +Asynchronous IO and concurrent synchronous writes +------------------------------------------------- + +Why do concurrent writes to the same stream result in inter-mixed output? +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +While the streaming I/O API is synchronous, the underlying implementation +is fully asynchronous. + +The following:: + + @sync for i in 1:3 + @async print(i, " Foo ", " Bar ") + end + +results in:: + 123 Foo Foo Foo Bar Bar Bar + +This is happening because, while ``print(i, " Foo ", " Bar ")`` is synchronous, +internally, the writing of each argument yields to other tasks while waiting for +that part of the I/O to complete. + +``println`` to asynchronous streams like STDOUT, TcpSockets, "locks" the stream +during a call. Consequently changing ``print`` to ``println`` in the above example +results in:: + + 1 Foo Bar + 2 Foo Bar + 3 Foo Bar + +For other functions and streams, etc, you could lock your writes with a ``ReentrantLock`` +like this:: + + l = ReentrantLock() + @sync for i in 1:3 + @async begin + lock(l) + try + print(i, " Foo ", " Bar ") + finally + unlock(l) + end + end + Julia Releases ---------------- diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index 5bcec5626808c..6f8f1542697c8 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -98,6 +98,22 @@ Tasks Block the current task for a specified number of seconds. The minimum sleep time is 1 millisecond or input of ``0.001``. +.. function:: ReentrantLock() + + Creates a reentrant lock. The same task can acquire the lock as many times + as required. Each lock must be matched with an unlock. + +.. function:: lock(l::ReentrantLock) + + Associates ``l`` with the current task. If ``l`` is already locked by a different + task, waits for it to become available. The same task can acquire the lock multiple + times. Each "lock" must be matched by an "unlock" + +.. function:: unlock(l::ReentrantLock) + + Releases ownership of the lock by the current task. If the lock had been acquired before, + it just decrements an internal counter and returns immediately. + General Parallel Computing Support ----------------------------------