diff --git a/NEWS.md b/NEWS.md index b77a786c24823..641b84b480bb4 100644 --- a/NEWS.md +++ b/NEWS.md @@ -17,6 +17,11 @@ New language features - atomic set once (`@atomiconce v[3] = 2`), - atomic swap (`x = @atomicswap v[3] = 2`), and - atomic replace (`x = @atomicreplace v[3] 2=>5`). +- New option `--task-metrics=yes` to enable the collection of per-task timing information, + which can also be enabled/disabled at runtime with `Base.Experimental.task_metrics(::Bool)`. ([#56320]) + The available metrics are: + - actual running time for the task (`Base.Experimental.task_running_time_ns`), and + - wall-time for the task (`Base.Experimental.task_wall_time_ns`). Language changes ---------------- diff --git a/base/boot.jl b/base/boot.jl index f66ee69780193..4badedae3cfb7 100644 --- a/base/boot.jl +++ b/base/boot.jl @@ -175,15 +175,33 @@ #end #mutable struct Task -# parent::Task +# next::Any +# queue::Any # storage::Any -# state::Symbol # donenotify::Any # result::Any -# exception::Any -# backtrace::Any # scope::Any # code::Any +# @atomic _state::UInt8 +# sticky::UInt8 +# priority::UInt16 +# @atomic _isexception::UInt8 +# pad00::UInt8 +# pad01::UInt8 +# pad02::UInt8 +# rngState0::UInt64 +# rngState1::UInt64 +# rngState2::UInt64 +# rngState3::UInt64 +# rngState4::UInt64 +# const metrics_enabled::Bool +# pad10::UInt8 +# pad11::UInt8 +# pad12::UInt8 +# @atomic first_enqueued_at::UInt64 +# @atomic last_started_running_at::UInt64 +# @atomic running_time_ns::UInt64 +# @atomic finished_at::UInt64 #end export diff --git a/base/experimental.jl b/base/experimental.jl index 411bb2407cdc5..17871b4f346d6 100644 --- a/base/experimental.jl +++ b/base/experimental.jl @@ -503,4 +503,78 @@ usage, by eliminating the tracking of those possible invalidation. """ disable_new_worlds() = ccall(:jl_disable_new_worlds, Cvoid, ()) +### Task metrics + +""" + Base.Experimental.task_metrics(::Bool) + +Enable or disable the collection of per-task metrics. +A `Task` created when `Base.Experimental.task_metrics(true)` is in effect will have +[`Base.Experimental.task_running_time_ns`](@ref) and [`Base.Experimental.task_wall_time_ns`](@ref) +timing information available. + +!!! note + Task metrics can be enabled at start-up via the `--task-metrics=yes` command line option. +""" +function task_metrics(b::Bool) + if b + ccall(:jl_task_metrics_enable, Cvoid, ()) + else + ccall(:jl_task_metrics_disable, Cvoid, ()) + end + return nothing end + +""" + Base.Experimental.task_running_time_ns(t::Task) -> Union{UInt64, Nothing} + +Return the total nanoseconds that the task `t` has spent running. +This metric is only updated when `t` yields or completes unless `t` is the current task, in +which it will be updated continuously. +See also [`Base.Experimental.task_wall_time_ns`](@ref). + +Returns `nothing` if task timings are not enabled. +See [`Base.Experimental.task_metrics`](@ref). + +!!! note "This metric is from the Julia scheduler" + A task may be running on an OS thread that is descheduled by the OS + scheduler, this time still counts towards the metric. + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_running_time_ns(t::Task=current_task()) + t.metrics_enabled || return nothing + if t == current_task() + # These metrics fields can't update while we're running. + # But since we're running we need to include the time since we last started running! + return t.running_time_ns + (time_ns() - t.last_started_running_at) + else + return t.running_time_ns + end +end + +""" + Base.Experimental.task_wall_time_ns(t::Task) -> Union{UInt64, Nothing} + +Return the total nanoseconds that the task `t` was runnable. +This is the time since the task first entered the run queue until the time at which it +completed, or until the current time if the task has not yet completed. +See also [`Base.Experimental.task_running_time_ns`](@ref). + +Returns `nothing` if task timings are not enabled. +See [`Base.Experimental.task_metrics`](@ref). + +!!! compat "Julia 1.12" + This method was added in Julia 1.12. +""" +function task_wall_time_ns(t::Task=current_task()) + t.metrics_enabled || return nothing + start_at = t.first_enqueued_at + start_at == 0 && return UInt64(0) + end_at = t.finished_at + end_at == 0 && return time_ns() - start_at + return end_at - start_at +end + +end # module diff --git a/base/options.jl b/base/options.jl index 07baa3b51f65b..7e7808bd5c047 100644 --- a/base/options.jl +++ b/base/options.jl @@ -61,6 +61,7 @@ struct JLOptions heap_size_hint::UInt64 trace_compile_timing::Int8 trim::Int8 + task_metrics::Int8 end # This runs early in the sysimage != is not defined yet diff --git a/base/task.jl b/base/task.jl index 2a922c4b85f24..951e980ee903c 100644 --- a/base/task.jl +++ b/base/task.jl @@ -977,7 +977,11 @@ function enq_work(t::Task) return t end -schedule(t::Task) = enq_work(t) +function schedule(t::Task) + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) + enq_work(t) +end """ schedule(t::Task, [val]; error=false) @@ -1031,6 +1035,8 @@ function schedule(t::Task, @nospecialize(arg); error=false) t.queue === nothing || Base.error("schedule: Task not runnable") setfield!(t, :result, arg) end + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) enq_work(t) return t end @@ -1064,11 +1070,15 @@ immediately yields to `t` before calling the scheduler. Throws a `ConcurrencyViolationError` if `t` is the currently running task. """ function yield(t::Task, @nospecialize(x=nothing)) - current = current_task() - t === current && throw(ConcurrencyViolationError("Cannot yield to currently running task!")) + ct = current_task() + t === ct && throw(ConcurrencyViolationError("Cannot yield to currently running task!")) (t._state === task_state_runnable && t.queue === nothing) || throw(ConcurrencyViolationError("yield: Task not runnable")) + # [task] user_time -yield-> wait_time + record_running_time!(ct) + # [task] created -scheduled-> wait_time + maybe_record_enqueued!(t) t.result = x - enq_work(current) + enq_work(ct) set_next_task(t) return try_yieldto(ensure_rescheduled) end @@ -1082,6 +1092,7 @@ call to `yieldto`. This is a low-level call that only switches tasks, not consid or scheduling in any way. Its use is discouraged. """ function yieldto(t::Task, @nospecialize(x=nothing)) + ct = current_task() # TODO: these are legacy behaviors; these should perhaps be a scheduler # state error instead. if t._state === task_state_done @@ -1089,6 +1100,10 @@ function yieldto(t::Task, @nospecialize(x=nothing)) elseif t._state === task_state_failed throw(t.result) end + # [task] user_time -yield-> wait_time + record_running_time!(ct) + # [task] created -scheduled-unfairly-> wait_time + maybe_record_enqueued!(t) t.result = x set_next_task(t) return try_yieldto(identity) @@ -1102,6 +1117,10 @@ function try_yieldto(undo) rethrow() end ct = current_task() + # [task] wait_time -(re)started-> user_time + if ct.metrics_enabled + @atomic :monotonic ct.last_started_running_at = time_ns() + end if ct._isexception exc = ct.result ct.result = nothing @@ -1115,6 +1134,11 @@ end # yield to a task, throwing an exception in it function throwto(t::Task, @nospecialize exc) + ct = current_task() + # [task] user_time -yield-> wait_time + record_running_time!(ct) + # [task] created -scheduled-unfairly-> wait_time + maybe_record_enqueued!(t) t.result = exc t._isexception = true set_next_task(t) @@ -1167,6 +1191,9 @@ checktaskempty = Partr.multiq_check_empty end function wait() + ct = current_task() + # [task] user_time -yield-or-done-> wait_time + record_running_time!(ct) GC.safepoint() W = workqueue_for(Threads.threadid()) poptask(W) @@ -1181,3 +1208,21 @@ if Sys.iswindows() else pause() = ccall(:pause, Cvoid, ()) end + +# update the `running_time_ns` field of `t` to include the time since it last started running. +function record_running_time!(t::Task) + if t.metrics_enabled && !istaskdone(t) + @atomic :monotonic t.running_time_ns += time_ns() - t.last_started_running_at + end + return t +end + +# if this is the first time `t` has been added to the run queue +# (or the first time it has been unfairly yielded to without being added to the run queue) +# then set the `first_enqueued_at` field to the current time. +function maybe_record_enqueued!(t::Task) + if t.metrics_enabled && t.first_enqueued_at == 0 + @atomic :monotonic t.first_enqueued_at = time_ns() + end + return t +end diff --git a/doc/man/julia.1 b/doc/man/julia.1 index 56cb690d66eeb..2da11ae1b3f18 100644 --- a/doc/man/julia.1 +++ b/doc/man/julia.1 @@ -294,6 +294,10 @@ If --trace-compile is enabled show how long each took to compile in ms --trace-dispatch={stderr|name} Print precompile statements for methods dispatched during execution or save to stderr or a path. +.TP +--task-metrics={yes|no*} +Enable the collection of per-task metrics. + .TP -image-codegen Force generate code in imaging mode diff --git a/doc/src/base/multi-threading.md b/doc/src/base/multi-threading.md index 9e3bc49acf6dc..81d1d83d765ac 100644 --- a/doc/src/base/multi-threading.md +++ b/doc/src/base/multi-threading.md @@ -65,3 +65,11 @@ These building blocks are used to create the regular synchronization objects. ```@docs Base.Threads.SpinLock ``` + +## Task metrics (Experimental) + +```@docs +Base.Experimental.task_metrics +Base.Experimental.task_running_time_ns +Base.Experimental.task_wall_time_ns +``` diff --git a/doc/src/manual/command-line-interface.md b/doc/src/manual/command-line-interface.md index 734d7031db5e8..9b06deaf0ea8a 100644 --- a/doc/src/manual/command-line-interface.md +++ b/doc/src/manual/command-line-interface.md @@ -203,6 +203,7 @@ The following is a complete list of command-line switches available when launchi |`--code-coverage=tracefile.info` |Append coverage information to the LCOV tracefile (filename supports format tokens).| |`--track-allocation[={none*\|user\|all}]` |Count bytes allocated by each source line (omitting setting is equivalent to "user")| |`--track-allocation=@` |Count bytes but only in files that fall under the given file path/directory. The `@` prefix is required to select this option. A `@` with no path will track the current directory.| +|`--task-metrics={yes\|no*}` |Enable the collection of per-task metrics| |`--bug-report=KIND` |Launch a bug report session. It can be used to start a REPL, run a script, or evaluate expressions. It first tries to use BugReporting.jl installed in current environment and falls back to the latest compatible BugReporting.jl if not. For more information, see `--bug-report=help`.| |`--heap-size-hint=` |Forces garbage collection if memory usage is higher than the given value. The value may be specified as a number of bytes, optionally in units of KB, MB, GB, or TB, or as a percentage of physical memory with %.| |`--compile={yes*\|no\|all\|min}` |Enable or disable JIT compiler, or request exhaustive or minimal compilation| diff --git a/src/init.c b/src/init.c index 1cd14e8556cc6..7b41e63e98455 100644 --- a/src/init.c +++ b/src/init.c @@ -849,6 +849,10 @@ JL_DLLEXPORT void julia_init(JL_IMAGE_SEARCH rel) #if defined(_COMPILER_GCC_) && __GNUC__ >= 12 #pragma GCC diagnostic ignored "-Wdangling-pointer" #endif + if (jl_options.task_metrics == JL_OPTIONS_TASK_METRICS_ON) { + // enable before creating the root task so it gets timings too. + jl_atomic_fetch_add(&jl_task_metrics_enabled, 1); + } // warning: this changes `jl_current_task`, so be careful not to call that from this function jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi); #pragma GCC diagnostic pop diff --git a/src/jlapi.c b/src/jlapi.c index a3621385a437e..defb2db6ac911 100644 --- a/src/jlapi.c +++ b/src/jlapi.c @@ -809,6 +809,28 @@ JL_DLLEXPORT uint64_t jl_cumulative_recompile_time_ns(void) return jl_atomic_load_relaxed(&jl_cumulative_recompile_time); } +/** + * @brief Enable per-task timing. + */ +JL_DLLEXPORT void jl_task_metrics_enable(void) +{ + // Increment the flag to allow reentrant callers. + jl_atomic_fetch_add(&jl_task_metrics_enabled, 1); +} + +/** + * @brief Disable per-task timing. + */ +JL_DLLEXPORT void jl_task_metrics_disable(void) +{ + // Prevent decrementing the counter below zero + uint8_t enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled); + while (enabled > 0) { + if (jl_atomic_cmpswap(&jl_task_metrics_enabled, &enabled, enabled-1)) + break; + } +} + /** * @brief Retrieve floating-point environment constants. * diff --git a/src/jloptions.c b/src/jloptions.c index f81cf0453db21..c68b5ce193d98 100644 --- a/src/jloptions.c +++ b/src/jloptions.c @@ -152,6 +152,7 @@ JL_DLLEXPORT void jl_init_options(void) 0, // heap-size-hint 0, // trace_compile_timing JL_TRIM_NO, // trim + 0, // task_metrics }; jl_options_initialized = 1; } @@ -316,6 +317,7 @@ static const char opts_hidden[] = " comment if color is not supported\n" " --trace-compile-timing If --trace-compile is enabled show how long each took to\n" " compile in ms\n" + " --task-metrics={yes|no*} Enable collection of per-task timing data.\n" " --image-codegen Force generate code in imaging mode\n" " --permalloc-pkgimg={yes|no*} Copy the data section of package images into memory\n" " --trim={no*|safe|unsafe|unsafe-warn}\n" @@ -347,6 +349,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) opt_trace_compile, opt_trace_compile_timing, opt_trace_dispatch, + opt_task_metrics, opt_math_mode, opt_worker, opt_bind_to, @@ -427,6 +430,7 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) { "trace-compile", required_argument, 0, opt_trace_compile }, { "trace-compile-timing", no_argument, 0, opt_trace_compile_timing }, { "trace-dispatch", required_argument, 0, opt_trace_dispatch }, + { "task-metrics", required_argument, 0, opt_task_metrics }, { "math-mode", required_argument, 0, opt_math_mode }, { "handle-signals", required_argument, 0, opt_handle_signals }, // hidden command line options @@ -978,6 +982,14 @@ JL_DLLEXPORT void jl_parse_opts(int *argcp, char ***argvp) else jl_errorf("julia: invalid argument to --trim={safe|no|unsafe|unsafe-warn} (%s)", optarg); break; + case opt_task_metrics: + if (!strcmp(optarg, "no")) + jl_options.task_metrics = JL_OPTIONS_TASK_METRICS_OFF; + else if (!strcmp(optarg, "yes")) + jl_options.task_metrics = JL_OPTIONS_TASK_METRICS_ON; + else + jl_errorf("julia: invalid argument to --task-metrics={yes|no} (%s)", optarg); + break; default: jl_errorf("julia: unhandled option -- %c\n" "This is a bug, please report it.", c); diff --git a/src/jloptions.h b/src/jloptions.h index b9910702f3f9b..211122242cbbd 100644 --- a/src/jloptions.h +++ b/src/jloptions.h @@ -65,6 +65,7 @@ typedef struct { uint64_t heap_size_hint; int8_t trace_compile_timing; int8_t trim; + int8_t task_metrics; } jl_options_t; #endif diff --git a/src/jltypes.c b/src/jltypes.c index 6c6325d84a5ff..abd1f62092035 100644 --- a/src/jltypes.c +++ b/src/jltypes.c @@ -3746,7 +3746,7 @@ void jl_init_types(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(16, + jl_perm_symsvec(27, "next", "queue", "storage", @@ -3754,16 +3754,27 @@ void jl_init_types(void) JL_GC_DISABLED "result", "scope", "code", + "_state", + "sticky", + "priority", + "_isexception", + "pad00", + "pad01", + "pad02", "rngState0", "rngState1", "rngState2", "rngState3", "rngState4", - "_state", - "sticky", - "_isexception", - "priority"), - jl_svec(16, + "metrics_enabled", + "pad10", + "pad11", + "pad12", + "first_enqueued_at", + "last_started_running_at", + "running_time_ns", + "finished_at"), + jl_svec(27, jl_any_type, jl_any_type, jl_any_type, @@ -3771,21 +3782,36 @@ void jl_init_types(void) JL_GC_DISABLED jl_any_type, jl_any_type, jl_any_type, + jl_uint8_type, + jl_bool_type, + jl_uint16_type, + jl_bool_type, + jl_uint8_type, + jl_uint8_type, + jl_uint8_type, jl_uint64_type, jl_uint64_type, jl_uint64_type, jl_uint64_type, jl_uint64_type, - jl_uint8_type, jl_bool_type, - jl_bool_type, - jl_uint16_type), + jl_uint8_type, + jl_uint8_type, + jl_uint8_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type, + jl_uint64_type), jl_emptysvec, 0, 1, 6); XX(task); jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_nothing_type); jl_svecset(jl_task_type->types, 0, listt); - const static uint32_t task_atomicfields[1] = {0x00001000}; // Set fields 13 as atomic + // Set field 20 (metrics_enabled) as const + // Set fields 8 (_state) and 24-27 (metric counters) as atomic + const static uint32_t task_constfields[1] = { 0b00000000000010000000000000000000 }; + const static uint32_t task_atomicfields[1] = { 0b00000111100000000000000010000000 }; + jl_task_type->name->constfields = task_constfields; jl_task_type->name->atomicfields = task_atomicfields; tv = jl_svec2(tvar("A"), tvar("R")); diff --git a/src/julia.h b/src/julia.h index 6c0dd700f9472..a9864aad16ccc 100644 --- a/src/julia.h +++ b/src/julia.h @@ -2276,16 +2276,25 @@ typedef struct _jl_task_t { jl_value_t *result; jl_value_t *scope; jl_function_t *start; - // 4 byte padding on 32-bit systems - // uint32_t padding0; - uint64_t rngState[JL_RNG_SIZE]; _Atomic(uint8_t) _state; uint8_t sticky; // record whether this Task can be migrated to a new thread - _Atomic(uint8_t) _isexception; // set if `result` is an exception to throw or that we exited with - // 1 byte padding - // uint8_t padding1; - // multiqueue priority uint16_t priority; + _Atomic(uint8_t) _isexception; // set if `result` is an exception to throw or that we exited with + uint8_t pad0[3]; + // === 64 bytes (cache line) + uint64_t rngState[JL_RNG_SIZE]; + // flag indicating whether or not to record timing metrics for this task + uint8_t metrics_enabled; + uint8_t pad1[3]; + // timestamp this task first entered the run queue + _Atomic(uint64_t) first_enqueued_at; + // timestamp this task was most recently scheduled to run + _Atomic(uint64_t) last_started_running_at; + // time this task has spent running; updated when it yields or finishes. + _Atomic(uint64_t) running_time_ns; + // === 64 bytes (cache line) + // timestamp this task finished (i.e. entered state DONE or FAILED). + _Atomic(uint64_t) finished_at; // hidden state: // cached floating point environment @@ -2612,6 +2621,9 @@ JL_DLLEXPORT int jl_generating_output(void) JL_NOTSAFEPOINT; #define JL_TRIM_UNSAFE 2 #define JL_TRIM_UNSAFE_WARN 3 +#define JL_OPTIONS_TASK_METRICS_OFF 0 +#define JL_OPTIONS_TASK_METRICS_ON 1 + // Version information #include // Generated file diff --git a/src/julia_internal.h b/src/julia_internal.h index 2178f603441e0..4741316093f95 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -316,6 +316,9 @@ extern JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time; extern JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time; +// Global *atomic* integer controlling *process-wide* task timing. +extern JL_DLLEXPORT _Atomic(uint8_t) jl_task_metrics_enabled; + #define jl_return_address() ((uintptr_t)__builtin_return_address(0)) STATIC_INLINE uint32_t jl_int32hash_fast(uint32_t a) diff --git a/src/task.c b/src/task.c index 5e1172a96a409..1a50d6fcbcf65 100644 --- a/src/task.c +++ b/src/task.c @@ -313,6 +313,13 @@ void JL_NORETURN jl_finish_task(jl_task_t *ct) { JL_PROBE_RT_FINISH_TASK(ct); JL_SIGATOMIC_BEGIN(); + if (ct->metrics_enabled) { + // [task] user_time -finished-> wait_time + assert(jl_atomic_load_relaxed(&ct->first_enqueued_at) != 0); + uint64_t now = jl_hrtime(); + jl_atomic_store_relaxed(&ct->finished_at, now); + jl_atomic_fetch_add_relaxed(&ct->running_time_ns, now - jl_atomic_load_relaxed(&ct->last_started_running_at)); + } if (jl_atomic_load_relaxed(&ct->_isexception)) jl_atomic_store_release(&ct->_state, JL_TASK_STATE_FAILED); else @@ -1146,6 +1153,11 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion t->ptls = NULL; t->world_age = ct->world_age; t->reentrant_timing = 0; + t->metrics_enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled) != 0; + jl_atomic_store_relaxed(&t->first_enqueued_at, 0); + jl_atomic_store_relaxed(&t->last_started_running_at, 0); + jl_atomic_store_relaxed(&t->running_time_ns, 0); + jl_atomic_store_relaxed(&t->finished_at, 0); jl_timing_task_init(t); if (t->ctx.copy_stack) @@ -1245,6 +1257,12 @@ CFI_NORETURN fesetenv(&ct->fenv); ct->ctx.started = 1; + if (ct->metrics_enabled) { + // [task] wait_time -started-> user_time + assert(jl_atomic_load_relaxed(&ct->first_enqueued_at) != 0); + assert(jl_atomic_load_relaxed(&ct->last_started_running_at) == 0); + jl_atomic_store_relaxed(&ct->last_started_running_at, jl_hrtime()); + } JL_PROBE_RT_START_TASK(ct); jl_timing_block_task_enter(ct, ptls, NULL); if (jl_atomic_load_relaxed(&ct->_isexception)) { @@ -1596,6 +1614,19 @@ jl_task_t *jl_init_root_task(jl_ptls_t ptls, void *stack_lo, void *stack_hi) ct->ptls = ptls; ct->world_age = 1; // OK to run Julia code on this task ct->reentrant_timing = 0; + jl_atomic_store_relaxed(&ct->running_time_ns, 0); + jl_atomic_store_relaxed(&ct->finished_at, 0); + ct->metrics_enabled = jl_atomic_load_relaxed(&jl_task_metrics_enabled) != 0; + if (ct->metrics_enabled) { + // [task] created -started-> user_time + uint64_t now = jl_hrtime(); + jl_atomic_store_relaxed(&ct->first_enqueued_at, now); + jl_atomic_store_relaxed(&ct->last_started_running_at, now); + } + else { + jl_atomic_store_relaxed(&ct->first_enqueued_at, 0); + jl_atomic_store_relaxed(&ct->last_started_running_at, 0); + } ptls->root_task = ct; jl_atomic_store_relaxed(&ptls->current_task, ct); JL_GC_PROMISE_ROOTED(ct); diff --git a/src/threading.c b/src/threading.c index 8f0dfb3330885..ac9cc276d613a 100644 --- a/src/threading.c +++ b/src/threading.c @@ -49,6 +49,8 @@ JL_DLLEXPORT _Atomic(uint8_t) jl_measure_compile_time_enabled = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_compile_time = 0; JL_DLLEXPORT _Atomic(uint64_t) jl_cumulative_recompile_time = 0; +JL_DLLEXPORT _Atomic(uint8_t) jl_task_metrics_enabled = 0; + JL_DLLEXPORT void *jl_get_ptls_states(void) { // mostly deprecated: use current_task instead diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index cc3f8950f0dc0..5df174694049d 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -783,6 +783,15 @@ let exename = `$(Base.julia_cmd()) --startup-file=no --color=no` "Int(Base.JLOptions().fast_math)"`)) == JL_OPTIONS_FAST_MATH_DEFAULT end + let JL_OPTIONS_TASK_METRICS_OFF = 0, JL_OPTIONS_TASK_METRICS_ON = 1 + @test parse(Int,readchomp(`$exename -E + "Int(Base.JLOptions().task_metrics)"`)) == JL_OPTIONS_TASK_METRICS_OFF + @test parse(Int, readchomp(`$exename --task-metrics=yes -E + "Int(Base.JLOptions().task_metrics)"`)) == JL_OPTIONS_TASK_METRICS_ON + @test !parse(Bool, readchomp(`$exename -E "current_task().metrics_enabled"`)) + @test parse(Bool, readchomp(`$exename --task-metrics=yes -E "current_task().metrics_enabled"`)) + end + # --worker takes default / custom as argument (default/custom arguments # tested in test/parallel.jl) @test errors_not_signals(`$exename --worker=true`) diff --git a/test/core.jl b/test/core.jl index 63952e8728e1e..ba1803a137392 100644 --- a/test/core.jl +++ b/test/core.jl @@ -25,6 +25,7 @@ for (T, c) in ( (TypeVar, [:name, :ub, :lb]), (Core.Memory, [:length, :ptr]), (Core.GenericMemoryRef, [:mem, :ptr_or_offset]), + (Task, [:metrics_enabled]), ) @test Set((fieldname(T, i) for i in 1:fieldcount(T) if isconst(T, i))) == Set(c) end @@ -42,7 +43,7 @@ for (T, c) in ( (DataType, [:types, :layout]), (Core.Memory, []), (Core.GenericMemoryRef, []), - (Task, [:_state]) + (Task, [:_state, :running_time_ns, :finished_at, :first_enqueued_at, :last_started_running_at]), ) @test Set((fieldname(T, i) for i in 1:fieldcount(T) if Base.isfieldatomic(T, i))) == Set(c) end diff --git a/test/threads_exec.jl b/test/threads_exec.jl index ac54dd009390c..d77cf06905f44 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -3,6 +3,7 @@ using Test using Base.Threads using Base.Threads: SpinLock, threadpoolsize +using LinearAlgebra: peakflops # for cfunction_closure include("testenv.jl") @@ -1312,4 +1313,227 @@ end end end end + +@testset "Base.Experimental.task_metrics" begin + t = Task(() -> nothing) + @test_throws "const field" t.metrics_enabled = true + is_task_metrics_enabled() = fetch(Threads.@spawn current_task().metrics_enabled) + @test !is_task_metrics_enabled() + try + @testset "once" begin + Base.Experimental.task_metrics(true) + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) + @test !is_task_metrics_enabled() + end + @testset "multiple" begin + Base.Experimental.task_metrics(true) # 1 + Base.Experimental.task_metrics(true) # 2 + Base.Experimental.task_metrics(true) # 3 + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) # 2 + @test is_task_metrics_enabled() + Base.Experimental.task_metrics(false) # 1 + @test is_task_metrics_enabled() + @sync for i in 1:5 # 0 (not negative) + Threads.@spawn Base.Experimental.task_metrics(false) + end + @test !is_task_metrics_enabled() + Base.Experimental.task_metrics(true) # 1 + @test is_task_metrics_enabled() + end + finally + while is_task_metrics_enabled() + Base.Experimental.task_metrics(false) + end + end +end + +@testset "task time counters" begin + @testset "enabled" begin + try + Base.Experimental.task_metrics(true) + start_time = time_ns() + t = Threads.@spawn peakflops() + wait(t) + end_time = time_ns() + wall_time_delta = end_time - start_time + @test t.metrics_enabled + @test Base.Experimental.task_running_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) >= Base.Experimental.task_running_time_ns(t) + @test wall_time_delta > Base.Experimental.task_wall_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end + @testset "disabled" begin + t = Threads.@spawn peakflops() + wait(t) + @test !t.metrics_enabled + @test isnothing(Base.Experimental.task_running_time_ns(t)) + @test isnothing(Base.Experimental.task_wall_time_ns(t)) + end + @testset "task not run" begin + t1 = Task(() -> nothing) + @test !t1.metrics_enabled + @test isnothing(Base.Experimental.task_running_time_ns(t1)) + @test isnothing(Base.Experimental.task_wall_time_ns(t1)) + try + Base.Experimental.task_metrics(true) + t2 = Task(() -> nothing) + @test t2.metrics_enabled + @test Base.Experimental.task_running_time_ns(t2) == 0 + @test Base.Experimental.task_wall_time_ns(t2) == 0 + finally + Base.Experimental.task_metrics(false) + end + end + @testset "task failure" begin + try + Base.Experimental.task_metrics(true) + t = Threads.@spawn error("this task failed") + @test_throws "this task failed" wait(t) + @test Base.Experimental.task_running_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) > 0 + @test Base.Experimental.task_wall_time_ns(t) >= Base.Experimental.task_running_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end + @testset "direct yield(t)" begin + try + Base.Experimental.task_metrics(true) + start = time_ns() + t_outer = Threads.@spawn begin + t_inner = Task(() -> peakflops()) + t_inner.sticky = false + # directly yield to `t_inner` rather calling `schedule(t_inner)` + yield(t_inner) + wait(t_inner) + @test Base.Experimental.task_running_time_ns(t_inner) > 0 + @test Base.Experimental.task_wall_time_ns(t_inner) > 0 + @test Base.Experimental.task_wall_time_ns(t_inner) >= Base.Experimental.task_running_time_ns(t_inner) + end + wait(t_outer) + delta = time_ns() - start + @test Base.Experimental.task_running_time_ns(t_outer) > 0 + @test Base.Experimental.task_wall_time_ns(t_outer) > 0 + @test Base.Experimental.task_wall_time_ns(t_outer) >= Base.Experimental.task_running_time_ns(t_outer) + @test Base.Experimental.task_wall_time_ns(t_outer) < delta + finally + Base.Experimental.task_metrics(false) + end + end + @testset "bad schedule" begin + try + Base.Experimental.task_metrics(true) + t1 = Task((x) -> 1) + schedule(t1) # MethodError + yield() + @assert istaskfailed(t1) + @test Base.Experimental.task_running_time_ns(t1) > 0 + @test Base.Experimental.task_wall_time_ns(t1) > 0 + foo(a, b) = a + b + t2 = Task(() -> (peakflops(); foo(wait()))) + schedule(t2) + yield() + @assert istaskstarted(t1) && !istaskdone(t2) + schedule(t2, 1) + yield() + @assert istaskfailed(t2) + @test Base.Experimental.task_running_time_ns(t2) > 0 + @test Base.Experimental.task_wall_time_ns(t2) > 0 + finally + Base.Experimental.task_metrics(false) + end + end + @testset "continuously update until task done" begin + try + Base.Experimental.task_metrics(true) + last_running_time = Ref(typemax(Int)) + last_wall_time = Ref(typemax(Int)) + t = Threads.@spawn begin + running_time = Base.Experimental.task_running_time_ns() + wall_time = Base.Experimental.task_wall_time_ns() + for _ in 1:5 + x = time_ns() + while time_ns() < x + 100 + end + new_running_time = Base.Experimental.task_running_time_ns() + new_wall_time = Base.Experimental.task_wall_time_ns() + @test new_running_time > running_time + @test new_wall_time > wall_time + running_time = new_running_time + wall_time = new_wall_time + end + last_running_time[] = running_time + last_wall_time[] = wall_time + end + wait(t) + final_running_time = Base.Experimental.task_running_time_ns(t) + final_wall_time = Base.Experimental.task_wall_time_ns(t) + @test last_running_time[] < final_running_time + @test last_wall_time[] < final_wall_time + # ensure many more tasks are run to make sure the counters are + # not being updated after a task is done e.g. only when a new task is found + @sync for _ in 1:Threads.nthreads() + Threads.@spawn rand() + end + @test final_running_time == Base.Experimental.task_running_time_ns(t) + @test final_wall_time == Base.Experimental.task_wall_time_ns(t) + finally + Base.Experimental.task_metrics(false) + end + end +end + +@testset "task time counters: lots of spawns" begin + using Dates + try + Base.Experimental.task_metrics(true) + # create more tasks than we have threads. + # - all tasks must have: cpu time <= wall time + # - some tasks must have: cpu time < wall time + # - summing across all tasks we must have: total cpu time <= available cpu time + n_tasks = 2 * Threads.nthreads(:default) + cpu_times = Vector{UInt64}(undef, n_tasks) + wall_times = Vector{UInt64}(undef, n_tasks) + start_time = time_ns() + @sync begin + for i in 1:n_tasks + start_time_i = time_ns() + task_i = Threads.@spawn peakflops() + Threads.@spawn begin + wait(task_i) + end_time_i = time_ns() + wall_time_delta_i = end_time_i - start_time_i + cpu_times[$i] = cpu_time_i = Base.Experimental.task_running_time_ns(task_i) + wall_times[$i] = wall_time_i = Base.Experimental.task_wall_time_ns(task_i) + # task should have recorded some cpu-time and some wall-time + @test cpu_time_i > 0 + @test wall_time_i > 0 + # task cpu-time cannot be greater than its wall-time + @test wall_time_i >= cpu_time_i + # task wall-time must be less than our manually measured wall-time + # between calling `@spawn` and returning from `wait`. + @test wall_time_delta_i > wall_time_i + end + end + end + end_time = time_ns() + wall_time_delta = (end_time - start_time) + available_cpu_time = wall_time_delta * Threads.nthreads(:default) + summed_cpu_time = sum(cpu_times) + # total CPU time from all tasks can't exceed what was actually available. + @test available_cpu_time > summed_cpu_time + # some tasks must have cpu-time less than their wall-time, because we had more tasks + # than threads. + summed_wall_time = sum(wall_times) + @test summed_wall_time > summed_cpu_time + finally + Base.Experimental.task_metrics(false) + end +end + end # main testset