Skip to content

Commit

Permalink
Merge branch 'master' of github.com:JuliaLang/julia
Browse files Browse the repository at this point in the history
* 'master' of github.com:JuliaLang/julia:
  fixing issue #144, handling None better
  fixing issue #137 disable rint, nearbyint, and lrint call lround iround in a previous commit I added trunc, floor, and ceil for integers
  a DGC tweak, and making temp vars local in timing macros
  removing color profile tags from images so firefox doesn't mess up the colors
  fixing issue #146, out of memory error
  fixing regression in reinterpret()
  how the hell did i delete the makefile in my previous commit?
  new web terminal design
  important fix to the message latency fix. the timing is now done in a separate thread so messages can't be delayed by compute tasks.
  Actually fixed permute (for Tensors n>2 dimensions), and fixed ipermute (which uses permute)
  limiting memory/cpu usage and number of active sessions for robustness
  Improved the tensor.j find command, fixed the permute function. Ipermute still needs to be fixed
  no longer crashes if thread creation fails
  better handling of expired sessions
  server no longer crashes when users call exit()
  • Loading branch information
StefanKarpinski committed Jul 22, 2011
2 parents 6008d6c + 226e4ba commit b5958c6
Show file tree
Hide file tree
Showing 25 changed files with 485 additions and 417 deletions.
2 changes: 1 addition & 1 deletion j/array.j
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ hcat(A::Array...) = cat(2, A...)

function reinterpret{T,S}(::Type{T}, a::Array{S})
b = Array(T, div(numel(a)*sizeof(S),sizeof(T)))
copy_to(b, a, ulong(length(b)*sizeof(T)))
copy_to(pointer(b), pointer(a), ulong(length(b)*sizeof(T)))
return b
end
reinterpret(t,x) = reinterpret(t,[x])[1]
Expand Down
21 changes: 13 additions & 8 deletions j/math_libm.j
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ macro libfdmfunc_1arg_float(T,f)
end
end

macro libmfunc_1arg_int(T,f)
macro libmfunc_1arg_int(T,f,name...)
if length(name)>0
fname = name[1]
else
fname = f
end
quote
($f)(x::Float64) = ccall(dlsym(libm,$string(f)), Int32, (Float64,), x)
($f)(x::Float32) = ccall(dlsym(libm,$strcat(string(f),"f")), Int32, (Float32,), x)
@vectorize_1arg $T $f
($fname)(x::Float64) = ccall(dlsym(libm,$string(f)), Int32, (Float64,), x)
($fname)(x::Float32) = ccall(dlsym(libm,$strcat(string(f),"f")), Int32, (Float32,), x)
@vectorize_1arg $T $fname
end
end

Expand Down Expand Up @@ -57,17 +62,17 @@ end
@libfdmfunc_1arg_float Number erfc
@libfdmfunc_1arg_float Real ceil
@libfdmfunc_1arg_float Real floor
@libfdmfunc_1arg_float Real rint
#@libfdmfunc_1arg_float Real rint
@libfdmfunc_1arg_float Number lgamma

@libmfunc_1arg_float Number sqrt
@libmfunc_1arg_float Number exp2
@libmfunc_1arg_float Real nearbyint
#@libmfunc_1arg_float Real nearbyint
@libmfunc_1arg_float Real trunc
@libmfunc_1arg_float Real round

@libmfunc_1arg_int Real lrint
@libmfunc_1arg_int Real lround
#@libmfunc_1arg_int Real lrint
@libmfunc_1arg_int Real lround iround
@libmfunc_1arg_int Real ilogb

@libfdmfunc_2arg Number atan2
Expand Down
146 changes: 53 additions & 93 deletions j/multi.j
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,6 @@
##
## @bcast expr - run expr everywhere. useful for load().

## message i/o ##

function send_msg(s::IOStream, buf::IOStream, kind, args)
serialize(buf, kind)
for arg=args
serialize(buf, arg)
end
ccall(:jl_enq_send_req, Void, (Ptr{Void}, Ptr{Void}),
s.ios, buf.ios)
#ccall(:ios_write_direct, PtrInt, (Ptr{Void}, Ptr{Void}),
# s.ios, buf.ios)
end

SENDBUF = ()
function send_msg_unknown(s::IOStream, kind, args)
# for sending to a socket not associated with a worker
global SENDBUF
if is(SENDBUF,())
SENDBUF = memio()
end
send_msg(s, SENDBUF::IOStream, kind, args)
end

function send_msg(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

function send_msg_now(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg_now(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

# todo:
# - more indexing
# - take() to empty a Ref (full/empty variables)
Expand All @@ -101,7 +62,27 @@ end
# * add readline to event loop
# * GOs/darrays on a subset of nodes

## process group creation ##
## workers and message i/o ##

function send_msg_unknown(s::IOStream, kind, args)
error("attempt to send to unknown socket")
end

function send_msg(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

function send_msg_now(s::IOStream, kind, args...)
id = worker_id_from_socket(s)
if id > -1
return send_msg_now(worker_from_id(id), kind, args...)
end
send_msg_unknown(s, kind, args)
end

type Worker
host::String
Expand All @@ -111,8 +92,6 @@ type Worker
sendbuf::IOStream
id::Int32
del_msgs::Array{Any,1}
lastmsg::Float64
dirty::Bool

function Worker(host, port)
fd = ccall(:connect_to_host, Int32,
Expand All @@ -123,70 +102,54 @@ type Worker
Worker(host, port, fd, fdio(fd))
end

Worker(host,port,fd,sock,id) =
new(host, port, fd, sock, memio(), id, {}, 0.0, false)
Worker(host,port,fd,sock,id) = new(host, port, fd, sock, memio(), id, {})
Worker(host,port,fd,sock) = Worker(host,port,fd,sock,0)
end

function flush_worker(w::Worker)
if !isempty(w.del_msgs)
#print("sending delete of $(w.del_msgs)\n")
remote_do(w, del_clients, w.del_msgs...)
del_all(w.del_msgs)
end
ccall(:jl_enq_send_req, Void, (Ptr{Void}, Ptr{Void}),
w.socket.ios, w.sendbuf.ios)
w.dirty = false
end

function send_msg_now(w::Worker, kind, args...)
send_msg(w.socket, w.sendbuf, kind, args)
w.dirty = false
send_msg_(w, kind, args, true)
end

function send_msg(w::Worker, kind, args...)
send_msg_(w, kind, args, false)
end

function flush_gc_msgs(w::Worker)
msgs = w.del_msgs
w.del_msgs = {}
#print("sending delete of $msgs\n")
remote_do(w, del_clients, msgs...)
end

function send_msg_(w::Worker, kind, args, now::Bool)
buf = w.sendbuf
ccall(:jl_buf_mutex_lock, Void, ())
serialize(buf, kind)
for arg=args
serialize(buf, arg)
end
if !w.dirty
w.lastmsg = clock()
ccall(:jl_buf_mutex_unlock, Void, ())

if !now && !isempty(w.del_msgs)
flush_gc_msgs(w)
else
ccall(:jl_enq_send_req, Void, (Ptr{Void}, Ptr{Void}, Int32),
w.socket.ios, w.sendbuf.ios, now ? int32(1) : int32(0))
end
w.dirty = true
end

function flush_workers()
global PGRP
now = 0.0
for w = PGRP.workers
if isa(w,Worker) && (w::Worker).dirty
function flush_gc_msgs()
for w = (PGRP::ProcessGroup).workers
if isa(w,Worker)
k = w::Worker
if now==0.0
now = clock()
end
if now-k.lastmsg >= 0.0002
flush_worker(k)
if !isempty(k.del_msgs)
flush_gc_msgs(k)
end
end
end
end

# determine the maximum time we can sleep in select
function max_sleep_time()
global PGRP
mt = 10.0
now = 0.0
for w = PGRP.workers
if isa(w,Worker) && (w::Worker).dirty
if now==0.0
now = clock()
end
mt = min(mt,(w::Worker).lastmsg+0.0002-now)
end
end
max(mt,0.0)
end
## process group creation ##

type LocalProcess
end
Expand Down Expand Up @@ -407,10 +370,6 @@ function send_del_client(rr::RemoteRef)
else
w = worker_from_id(rr.where)
push(w.del_msgs, (rr2id(rr), myid()))
if !w.dirty
w.lastmsg = clock()
end
w.dirty = true
end
end

Expand Down Expand Up @@ -1466,9 +1425,11 @@ function event_loop(isclient)
add(fdset, fd)
end

nselect = select_read(fdset,
isempty(Workqueue) ? max_sleep_time() :
0.0)
bored = isempty(Workqueue)
if bored
flush_gc_msgs()
end
nselect = select_read(fdset, bored ? 10.0 : 0.0)
if nselect == 0
if !isempty(Workqueue)
perform_work()
Expand All @@ -1481,7 +1442,6 @@ function event_loop(isclient)
end
end
end
flush_workers()
end
catch e
if isa(e,DisconnectException)
Expand Down
3 changes: 3 additions & 0 deletions j/promotion.j
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
## promotion mechanism ##

promote_type{T}(::Type{T}) = T
promote_type(::Type{None}, ::Type{None}) = None
promote_type{T}(::Type{T}, ::Type{T}) = T
promote_type{T}(::Type{T}, ::Type{None}) = T
promote_type{T}(::Type{None}, ::Type{T}) = T
promote_type(S::Type, T::Type...) = promote_type(S, promote_type(T...))

function promote_type{T,S}(::Type{T}, ::Type{S})
Expand Down
Loading

0 comments on commit b5958c6

Please sign in to comment.