Skip to content

Commit

Permalink
fix println performance regression
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Apr 2, 2015
1 parent 3d2d84e commit 13a6f5c
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 67 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ Library improvements

* `readavailable` returns a byte vector instead of a string.

* `lock` and `unlock` which operate on `ReentrantLock`. Useful to lock a stream during
concurrent writes from multiple tasks


Deprecated or removed
---------------------

Expand Down
3 changes: 3 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1000,10 +1000,13 @@ export
current_task,
istaskstarted,
istaskdone,
lock,
notify,
produce,
ReentrantLock,
schedule,
task_local_storage,
unlock,
yield,

# time
Expand Down
37 changes: 37 additions & 0 deletions base/lock.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Advisory reentrant lock
type ReentrantLock
locked_by::Nullable{Task}
cond_wait::Condition
reentrancy_cnt::Int

ReentrantLock() = new(nothing, Condition(), 0)
end

function lock(rl::ReentrantLock)
t = current_task()
while true
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return
elseif t == get(rl.locked_by)
rl.reentrancy_cnt += 1
return
end
wait(rl.cond_wait)
end
end

unlock(o::Any) = unlock(o.lock)

function unlock(rl::ReentrantLock)
rl.reentrancy_cnt = rl.reentrancy_cnt - 1
if rl.reentrancy_cnt == 0
rl.locked_by = nothing
notify(rl.cond_wait)
elseif rl.reentrancy_cnt < 0
AssertionError("unlock count must match lock count")
end
rl
end

5 changes: 4 additions & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ end
function send_msg_(w::Worker, kind, args, now::Bool)
#println("Sending msg $kind")
io = w.w_stream
lock(io) do io
lock(io.lock)
try
serialize(io, kind)
for arg in args
serialize(io, arg)
Expand All @@ -178,6 +179,8 @@ function send_msg_(w::Worker, kind, args, now::Bool)
else
flush(io)
end
finally
unlock(io.lock)
end
end

Expand Down
4 changes: 3 additions & 1 deletion base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type TCPSocket <: Socket
closecb::Callback
closenotify::Condition
sendbuf::Nullable{IOBuffer}
lock::ReentrantLock

TCPSocket(handle) = new(
handle,
Expand All @@ -268,7 +269,8 @@ type TCPSocket <: Socket
false, Condition(),
false, Condition(),
false, Condition(),
nothing
nothing,
ReentrantLock()
)
end
function TCPSocket()
Expand Down
20 changes: 17 additions & 3 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type Pipe <: AsyncStream
closecb::Callback
closenotify::Condition
sendbuf::Nullable{IOBuffer}
lock::ReentrantLock

Pipe(handle) = new(
handle,
StatusUninit,
Expand All @@ -109,7 +111,7 @@ type Pipe <: AsyncStream
false,Condition(),
false,Condition(),
false,Condition(),
nothing)
nothing, ReentrantLock())
end
function Pipe()
handle = Libc.malloc(_sizeof_uv_named_pipe)
Expand Down Expand Up @@ -177,6 +179,7 @@ type TTY <: AsyncStream
closecb::Callback
closenotify::Condition
sendbuf::Nullable{IOBuffer}
lock::ReentrantLock
@windows_only ispty::Bool
function TTY(handle)
tty = new(
Expand All @@ -186,7 +189,7 @@ type TTY <: AsyncStream
PipeBuffer(),
false,Condition(),
false,Condition(),
nothing)
nothing, ReentrantLock())
@windows_only tty.ispty = Bool(ccall(:jl_ispty, Cint, (Ptr{Void},), handle))
tty
end
Expand Down Expand Up @@ -217,6 +220,16 @@ nb_available(stream::UVStream) = nb_available(stream.buffer)
show(io::IO,stream::TTY) = print(io,"TTY(",uv_status_string(stream),", ",
nb_available(stream.buffer)," bytes waiting)")

function println(io::AsyncStream, xs...)
lock(io.lock)
try
invoke(println, tuple(IO, typeof(xs)...), io, xs...)
finally
unlock(io.lock)
end
end


uvtype(::AsyncStream) = UV_STREAM
uvhandle(stream::AsyncStream) = stream.handle

Expand Down Expand Up @@ -957,8 +970,9 @@ type BufferStream <: AsyncStream
close_c::Condition
is_open::Bool
buffer_writes::Bool
lock::ReentrantLock

BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false)
BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock())
end

isopen(s::BufferStream) = s.is_open
Expand Down
7 changes: 2 additions & 5 deletions base/string.jl
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
## core text I/O ##

print(io::IO, x) = show(io, x)
function print(io::IO, xs...)
lock(io) do io
for x in xs print(io, x) end
end
end
print(io::IO, xs...) = for x in xs print(io, x) end

println(io::IO, xs...) = print(io, xs..., '\n')

print(xs...) = print(STDOUT, xs...)
Expand Down
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ include("nullable.jl")

# I/O
include("task.jl")
include("lock.jl")
include("show.jl")
include("stream.jl")
include("socket.jl")
Expand Down
57 changes: 0 additions & 57 deletions base/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -268,60 +268,3 @@ end

julia_exename() = ccall(:jl_is_debugbuild,Cint,())==0 ? "julia" : "julia-debug"

# Advisory reentrant lock
type ReentrantLock
locked_by::Nullable{Task}
cond_wait::Condition
reentrancy_cnt::Int

ReentrantLock() = new(nothing, Condition(), 0)
end

# Lock object during function execution. Recursive calls by the same task is OK.
const adv_locks_map = WeakKeyDict{Any, ReentrantLock}()
function lock(f::Function, o::Any)
rl = get(adv_locks_map, o, nothing)
if rl == nothing
rl = ReentrantLock()
adv_locks_map[o] = rl
end
lock(rl)

try
f(o)
finally
unlock(o)
end
end

function lock(rl::ReentrantLock)
t = current_task()
while true
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
return
elseif t == get(rl.locked_by)
rl.reentrancy_cnt += 1
return
end
wait(rl.cond_wait)
end
end

function unlock(o::Any)
rl = adv_locks_map[o]
unlock(rl)
end

function unlock(rl::ReentrantLock)
rl.reentrancy_cnt = rl.reentrancy_cnt - 1
if rl.reentrancy_cnt == 0
rl.locked_by = nothing
notify(rl.cond_wait)
elseif rl.reentrancy_cnt < 0
AssertionError("unlock count must match lock count")
end
rl
end

44 changes: 44 additions & 0 deletions doc/manual/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,50 @@ potential performance optimizations that can be achieved by other
means (e.g., using explicit loops), operators like ``+=`` and ``*=``
work by rebinding new values.

Asynchronous IO and concurrent synchronous writes
-------------------------------------------------

Why do concurrent writes to the same stream result in inter-mixed output?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

While the streaming I/O API is synchronous, the underlying implementation
is fully asynchronous.

The following::

@sync for i in 1:3
@async print(i, " Foo ", " Bar ")
end

results in::
123 Foo Foo Foo Bar Bar Bar

This is happening because, while ``print(i, " Foo ", " Bar ")`` is synchronous,
internally, the writing of each argument yields to other tasks while waiting for
that part of the I/O to complete.

``println`` to asynchronous streams like STDOUT, TcpSockets, "locks" the stream
during a call. Consequently changing ``print`` to ``println`` in the above example
results in::

1 Foo Bar
2 Foo Bar
3 Foo Bar

For other functions and streams, etc, you could lock your writes with a ``ReentrantLock``
like this::

l = ReentrantLock()
@sync for i in 1:3
@async begin
lock(l)
try
print(i, " Foo ", " Bar ")
finally
unlock(l)
end
end


Julia Releases
----------------
Expand Down
16 changes: 16 additions & 0 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ Tasks
Block the current task for a specified number of seconds. The minimum sleep
time is 1 millisecond or input of ``0.001``.

.. function:: ReentrantLock()

Creates a reentrant lock. The same task can acquire the lock as many times
as required. Each lock must be matched with an unlock.

.. function:: lock(l::ReentrantLock)

Associates ``l`` with the current task. If ``l`` is already locked by a different
task, waits for it to become available. The same task can acquire the lock multiple
times. Each "lock" must be matched by an "unlock"

.. function:: unlock(l::ReentrantLock)

Releases ownership of the lock by the current task. If the lock had been acquired before,
it just decrements an internal counter and returns immediately.


General Parallel Computing Support
----------------------------------
Expand Down

0 comments on commit 13a6f5c

Please sign in to comment.