diff --git a/spec/ysh-stdlib-sync.test.sh b/spec/ysh-stdlib-sync.test.sh new file mode 100644 index 0000000000..6be6b61a1a --- /dev/null +++ b/spec/ysh-stdlib-sync.test.sh @@ -0,0 +1,204 @@ +# spec/ysh-stdlib-sync + +## our_shell: ysh + +#### fifo pipe double closes +source --builtin draft-sync.ysh + +fifo-fd-new (&fd) +try { + fifo-fd-destroy (fd) +} +echo $_status +fifo-fd-destroy (fd) +## status: 1 +## STDOUT: +0 +## END + +#### semaphore syncrhonizing async jobs +source --builtin draft-sync.ysh + +sema-new (1, &s) +fork { + sleep 0.2 + sema-down (s) + echo 1 +} +fork { + sleep 0.4 + sema-down (s) + echo 2 +} +fork { + sleep 0.6 + sema-down (s) + echo 3 +} +sleep 0.8 +echo 4 +sema-up (s) +sleep 0.2 +echo 5 +sema-up (s) +sema-destroy (s) +## STDOUT: +1 +4 +2 +5 +3 +## END + +#### semaphore init with 3, async up once and multiple down +source --builtin draft-sync.ysh + +sema-new (3, &s) +fork { + sleep 0.2 + sema-up (s) +} +sema-down (s) +sema-down (s) +sema-down (s) +sema-down (s) +echo yes +## STDOUT: +yes +## END + +#### channel reads and writes +source --builtin draft-sync.ysh + +channel-new (&ch) + +for i in (0..4) { + fork { + for j in (0..4) { + echo $j | channel-in (ch) + } + } +} + +var sum = 0 +for i in (0..16) { + var cur = $(channel-out (ch)) => int() + setvar sum += cur +} + +echo $sum +channel-destroy (ch) +## STDOUT: +24 +## END + +#### channel but backed by blocked pipe +source --builtin draft-sync.ysh + +channel-new (&ch, __pipe_methods_blocked_netstring(4)) + +var sent = "I-am-a-pretty-damn-long-string-that-need-to-be-blocked" + +fork { + write -n -- "$sent" | channel-in (ch) +} + +var received = $(channel-out (ch)) +echo $[received === sent] +## STDOUT: +true +## END + +#### RWLock multiple shared lock and free, one exclusive lock +source --builtin draft-sync.ysh + +atom-new (&lk) + +fork { + atom-lock-shared (lk) + echo 1 + sleep 0.3 + atom-unlock (lk) +} +for _ in (0..3) { + fork { + sleep 0.1 + atom-lock-shared (lk) + echo 2 + sleep 0.2 + atom-unlock (lk) + } +} +sleep 0.1 +atom-lock-exclusive (lk) +echo 3 +atom-unlock (lk) +atom-destroy (lk) +## STDOUT: +1 +2 +2 +2 +3 +## END + +#### Reading and writing atom +source --builtin draft-sync.ysh + +atom-new (&l) +fork { + atom-lock-exclusive (l) + write -n 'w' | atom-write-in (l) + atom-unlock (l) +} +sleep 0.1 + +for _ in (0..3) { + fork { + atom-lock-shared (l) + atom-read-out (l) + sleep 0.2 + atom-read-out (l) + atom-unlock (l) + } +} +sleep 0.1 +write -n y +wait +atom-destroy (l) +write +## STDOUT: +wwwywww +## END + +#### Produce many value and exhaust the channel, and then reuse it +source --builtin draft-sync.ysh + +exh-channel-new (&ch) + +for i in (0..4) { + fork { + for j in (0..4) { + echo $j | exh-channel-in (ch) + } + } +} + +sleep 0.5 +exh-channel-exhaust (ch, &out) +var sum = 0 +for i in (out) { + setvar sum += i +} +echo $sum +# Reuses the channel +fork { + echo "yes!" | exh-channel-in (ch) +} +exh-channel-out (ch) +echo +exh-channel-destroy (ch) +## STDOUT: +24 +yes! +## END diff --git a/spec/ysh-stdlib-synch.test.sh b/spec/ysh-stdlib-synch.test.sh deleted file mode 100644 index a625b681e7..0000000000 --- a/spec/ysh-stdlib-synch.test.sh +++ /dev/null @@ -1,56 +0,0 @@ -# spec/ysh-stdlib - -## our_shell: ysh - -#### semaphore -source --builtin draft-synch.ysh - -sema-new (1, &s) -fork { - sleep 0.5 - sema-down (s) - echo 1 -} -fork { - sleep 1 - sema-down (s) - echo 2 -} -fork { - sleep 1.5 - sema-down (s) - echo 3 -} -sleep 2 -echo 4 -sema-up (s) -sleep 0.5 -echo 5 -sema-up (s) -sema-destroy (s) -## STDOUT: -1 -4 -2 -5 -3 -## END - -#### semaphore init and multiple down -source --builtin draft-synch.ysh - -sema-new (3, &s) -fork { - sleep 1 - sema-up (s) -} -sema-down (s) -sema-down (s) -sema-down (s) -sema-down (s) -echo yes -## STDOUT: -yes -## END - -# TODO: add test case for mutex and other sync primitives diff --git a/stdlib/draft-descriptor.ysh b/stdlib/draft-descriptor.ysh new file mode 100644 index 0000000000..85391f62d3 --- /dev/null +++ b/stdlib/draft-descriptor.ysh @@ -0,0 +1,89 @@ +#!/usr/bin/env ysh +# vim:foldmethod=marker + +module stdlib/descriptor || return 0 + +# General utilities for file descriptors {{{ + +# NOTE: we need a proper exception system. +func getFdFile(fd) { + var fd_file + try { + setvar fd_file = $(readlink "/proc/$BASHPID/fd/$fd") + } + if (_status === 0) { + return (fd_file) + } else { + { + echo "File descriptor $fd is not opened by process $BASHPID!" + } >&2 + exit 1 + } +} + +func getFdInfo(fd) { + var fd_info = "/proc/$BASHPID/fdinfo/$fd" + if test -f $fd_info { + return ($(cat $fd_info)) + } else { + { + echo "File descriptor $fd is not opened by process $BASHPID!" + } >&2 + exit 1 + } +} + +func getFdFlag(fd) { + var fd_info = getFdInfo(fd) + if (fd_info ~ / 'flags:' \t /) { + return (_match(1)) + } else { + { + echo "Can't find flags in for file descriptor $fd of process $BASHPID, here's the content:" + echo $fd_info + } >&2 + exit 1 + } +} + +# O_RDONLY (00), O_WRONLY (01) & O_RDWR (02) +func isFdRead(fd) { + # HACK: this take one octal number direct from the number string, + # since currently there's no way to convert number bits in std. + var fd_flag = getFdFlag(fd) + return (fd_flag[-1] === "0" or fd_flag[-1] === "2" ) +} + +func isFdWrite (fd) { + var fd_flag = getFdFlag(fd) + return (fd_flag[-1] === "1" or fd_flag[-1] === "2") +} + +# More strict than the standard version +proc fd-destroy-and-rm-file(fd) { + var fd_file = getFdFile(fd) + # NOTE: bash treat >&- and <&- the same, not sure for ysh's case + # REFERENCE: https://unix.stackexchange.com/questions/131801/closing-a-file-descriptor-vs + exec {fd}>&- + rm $fd_file +} + +# }}} +# FIFO File Descriptors {{{ + +proc fifo-fd-new(; out_fd) { + # WARN: this section should be critical but for now it's not + # A solution may be retry on fail. + #==================== + var fifo = $(mktemp -u) + mkfifo $fifo + #==================== + exec {fd}<>$fifo + call out_fd->setValue(fd) +} + +proc fifo-fd-destroy (; fd) { + fd-destroy-and-rm-file $fd +} + +# }}} diff --git a/stdlib/draft-pipe.ysh b/stdlib/draft-pipe.ysh new file mode 100644 index 0000000000..88f93338cc --- /dev/null +++ b/stdlib/draft-pipe.ysh @@ -0,0 +1,94 @@ +#!/usr/bin/env ysh +# vim:foldmethod=marker + +module stdlib/pipe || return 0 + +const __pipe_NUM_DELIM = u'$' + +# NOTE: I would love to optimize this a bit more, for example netstring of size n +# now takes log_10(n) over head. We can certainly do this better by byte encoding +# It would be log_128(n) (using 1 bit for indicating the number ends) +# That's a ln 128 / ln 10 which is roughly twice less overhead. +proc delim-num-pipe-in (; num) { + write -n -- "$num$__pipe_NUM_DELIM" +} + +proc delim-num-pipe-out (; out_num) { + var num_str = "" + while (true) { + read -n 1 next_char + if (next_char !== __pipe_NUM_DELIM) { + setvar num_str = num_str ++ next_char + } else { + break + } + } + call out_num->setValue(num_str => int()) +} + + +# Netstring {{{ +proc netstring-pipe-in () { + var msg = $(cat) # consume everything from input + delim-num-pipe-in (len(msg)) + write -n -- "$msg" +} + +proc netstring-pipe-out() { + delim-num-pipe-out (&msg_len) + read -n $msg_len msg + write -n -- "$msg" +} + +# NOTE: No way to refer to procs, I wrap them around funcs for now. +func __pipe_netstring_pipe_in_wrap() { + netstring-pipe-in +} +func __pipe_netstring_pipe_out_wrap() { + netstring-pipe-out +} +const Pipe_Methods_NetString = { + in_pipe: __pipe_netstring_pipe_in_wrap, + out_pipe: __pipe_netstring_pipe_out_wrap, +} +# }}} +# Blocked NetString {{{ +proc blocked-netstring-pipe-in (; block_size) { + while (true) { + var chunk + try { + read -n $block_size chunk + } + var last_chunk = (_status !== 0) + write -n -- "$chunk" | netstring-pipe-in + if (last_chunk) { + break + } + } +} + +proc blocked-netstring-pipe-out (; block_size) { + while (true) { + var chunk = $(netstring-pipe-out) + write -n -- "$chunk" + if (len(chunk) !== block_size) { + break + } + } +} + + +func __pipe_blocked_netstring_pipe_in_wrap(block_size) { + blocked-netstring-pipe-in (block_size) +} +func __pipe_blocked_pipe_out_wrap(block_size) { + blocked-netstring-pipe-out (block_size) +} + +func __pipe_methods_blocked_netstring(blocksize) { + return ({ + in_pipe: __pipe_netstring_pipe_in_wrap, + out_pipe: __pipe_netstring_pipe_out_wrap, + }) +} +# }}} diff --git a/stdlib/draft-sync.ysh b/stdlib/draft-sync.ysh new file mode 100644 index 0000000000..2123e4f6a5 --- /dev/null +++ b/stdlib/draft-sync.ysh @@ -0,0 +1,267 @@ +#!/usr/bin/env ysh +# vim:foldmethod=marker + +module stdlib/sync || return 0 +source --builtin draft-descriptor.ysh +source --builtin draft-pipe.ysh + +# Semaphores {{{ + +proc sema-new(; value, out_sema) { + fifo-fd-new (&sema) + sema-up (sema, value) + call out_sema->setValue(sema) +} + +proc sema-down(; sema) { + read <&$sema +} + +proc sema-up(; sema, delta = 1) { + fork { + for _ in (0 .. delta) { + echo >&$sema + } + } +} + +proc sema-destroy(; sema) { + fifo-fd-destroy (sema) +} + +# }}} +# Mutex {{{ + +proc mutex-new(; out_mutex) { + sema-new (1, out_mutex) +} + +proc mutex-acquire(; mutex) { + sema-down (mutex) +} + +proc mutex-release(; mutex) { + sema-up (mutex) +} + +proc mutex-destroy(; mutex) { + sema-destroy (mutex) +} + +# }}} +# Channels {{{ +# - Pipeline but you can send multiple objects across pipelines +# - Has to manually ensure the number of sent/received blocks are same across each side. +# - Blocks, no buffering +# - Backed by a modified net-string implementation +# - We may may modify this to replace with an implementation that allows streaming, however. + +proc channel-new(;out_chan, methods = null) { +# NOTE: Wait for dicts to be able to be set for default fields +# proc channel-new(;out_chan, methods = Pipe_Methods_NetString) { + if (methods === null) { + setvar methods = Pipe_Methods_NetString + } + mutex-new (&write_lock) + mutex-new (&read_lock) + fifo-fd-new (&pipe) + call out_chan->setValue({ + write_lock, + read_lock, + pipe, + methods, + }) +} + +proc channel-in(; chan) { + mutex-acquire (chan.write_lock) + { + call chan.methods.in_pipe() + } >&$[chan.pipe] + mutex-release (chan.write_lock) +} + +proc channel-out(; chan) { + mutex-acquire (chan.write_lock) + { + call chan.methods.out_pipe() + } <&$[chan.pipe] + mutex-release (chan.write_lock) +} + +proc channel-destroy(; chan) { + # Ensures no one else is working + mutex-acquire (chan.read_lock) + mutex-acquire (chan.write_lock) + # Clean up + fifo-fd-destroy (chan.pipe) + mutex-destroy (chan.write_lock) + mutex-destroy (chan.read_lock) +} + +# }}} +# Atom {{{ + +proc atom-new(; out_lock) { + var lockfile = $(mktemp) + var lock = { + fd: null, + lockfile, + } + call out_lock->setValue(lock) +} + +proc atom-read-out(; lock) { + if (isFdRead(lock.fd)) { + # TODO: ensure lock is held with correct permission + cat $[lock.lockfile] + } else { + echo "No rwlock held at $[lock.fd]" 1>&2 + exit 1 + } +} + +proc atom-write-in(; lock) { + if (isFdWrite(lock.fd)) { + # TODO: ensure lock is held with correct permission + cat > $[lock.lockfile] + } else { + echo "No exclusive lock held at rwlock $[lock.fd]" 1>&2 + exit 1 + } +} + +# NOTE: to change the type of a lock to T, e.g. from shared to exclusive, just call atom-{T} again. +proc atom-lock-shared(; lock) { + exec {lock_fd}<$[lock.lockfile] + setvar lock.fd = lock_fd + flock -s $[lock.fd] +} + +proc atom-lock-exclusive(; lock) { + exec {lock_fd}<>$[lock.lockfile] + setvar lock.fd = lock_fd + flock -x $[lock.fd] +} + +proc atom-unlock(; lock) { + var lock_fd_info + try { + setvar lock_fd_info = getFdInfo(lock.fd) + } + if (_status === 0) { + var lock_fd = lock.fd + flock -u $lock_fd + exec {lock_fd}<&- + } else { + echo "No rwlock held at $[lock.fd]" 1>&2 + exit 1 + } +} + +proc atom-destroy(; lock) { + atom-lock-exclusive (lock) + fd-destroy-and-rm-file $[lock.fd] +} + +proc atom-swap-fn(; lock, fn) { + atom-lock-exclusive (lock) + var swapped = $(atom-read-out (lock)) => fn() + write -n -- "$swapped" | atom-write-in (lock) + atom-unlock (lock) +} + +# TODO: Performs an lock-guarded write on a lock's file +# The issue is we need to know when to close the pipe +# proc atom-swap-pipe(; lock) + +# }}} +# Exhaustable Channels {{{ +# Channels but exhaustable + +proc exh-channel-new(; out_chan, methods = null) { + if (methods === null) { + setvar methods = Pipe_Methods_NetString + } + mutex-new (&write_lock) + mutex-new (&read_lock) + fifo-fd-new (&pipe) + # a counter on how many information we have on the pipe + atom-new (&message_count) + # a lock indicating if there's upcoming writes + atom-new (&will_write) + atom-lock-exclusive (message_count) + write -n -- 0 | atom-write-in (message_count) + atom-unlock (message_count) + call out_chan->setValue({ + write_lock, + read_lock, + pipe, + methods, + message_count, + will_write, + }) +} + +# TODO: when lambda landed, this can be refactor to a parse chained to an inc. +func __sync_exh_channel_inc_untyped (buf) { + var typed = buf => int() + var swapped = typed + 1 + var untyped = "$[swapped]" + return (untyped) +} + +proc exh-channel-in(; chan) { + atom-swap-fn (chan.message_count, __sync_exh_channel_inc_untyped) + atom-lock-shared (chan.will_write) + mutex-acquire (chan.write_lock) + { + call chan.methods.in_pipe() + } >&$[chan.pipe] + mutex-release (chan.write_lock) + atom-unlock (chan.will_write) +} + +proc __exh_channel_out_no_lock(; chan) { + { + call chan.methods.out_pipe() + } <&$[chan.pipe] +} + +proc exh-channel-out(; chan) { + mutex-acquire (chan.read_lock) + __exh_channel_out_no_lock (chan) + mutex-release (chan.read_lock) +} + +proc exh-channel-exhaust(; chan, out_ret) { + # No body should send anything to the channel from now on + atom-lock-exclusive (chan.will_write) + # Count how many message we have + atom-lock-exclusive (chan.message_count) + atom-read-out (chan.message_count) | json8 read (&num_msg) + write -n -- 0 | atom-write-in (chan.message_count) + atom-unlock (chan.message_count) + + var ret = [] + mutex-acquire (chan.read_lock) + for _ in (0..num_msg) { + call ret->append($(__exh_channel_out_no_lock (chan))) + } + mutex-release (chan.read_lock) + atom-unlock (chan.will_write) + call out_ret->setValue(ret) +} + +proc exh-channel-destroy(; chan) { + # Ensures no one else is working + mutex-acquire (chan.read_lock) + mutex-acquire (chan.write_lock) + # Clean up + fifo-fd-destroy (chan.pipe) + atom-destroy (chan.message_count) + atom-destroy (chan.will_write) + mutex-destroy (chan.write_lock) + mutex-destroy (chan.read_lock) +} +# }}} diff --git a/stdlib/draft-synch.ysh b/stdlib/draft-synch.ysh deleted file mode 100644 index 0120c3657c..0000000000 --- a/stdlib/draft-synch.ysh +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env ysh - -module stdlib/synch || return 0 - -############################ -### FIFO File Desriptors ### -############################ - -proc fifo-fd-new(; out_fd) { - # WARN: this section should be critical but for now it's not - # A solution may be retry on fail. - #==================== - var fifo = $(mktemp -u) - mkfifo $fifo - #==================== - exec {fd}<>$fifo - call out_fd->setValue(fd) -} - -proc fifo-fd-destroy(; fd) { - var fifoFile = $(readlink /proc/$$/fd/$fd) - exec {fd}>&- - exec {fd}<&- - rm $fifoFile -} - -################# -### Semaphore ### -################# - -proc sema-new(; value, out_sema) { - fifo-fd-new (&sema) - sema-up (sema, value) - call out_sema->setValue(sema) -} - -proc sema-down(; sema) { - read <&$sema -} - -proc sema-up(; sema, delta = 1) { - fork { - for _ in (0 .. delta) { - echo >&$sema - } - } -} - -proc sema-destroy(; sema) { - fifo-fd-destroy (sema) -} diff --git a/stdlib/math.ysh b/stdlib/math.ysh index ff8b5423e3..eec2c6aad0 100644 --- a/stdlib/math.ysh +++ b/stdlib/math.ysh @@ -77,3 +77,11 @@ func abs(x) { return (x) } } + +func inc(x) { + return (x + 1) +} + +func dec(x) { + return (x - 1) +} diff --git a/test/spec.sh b/test/spec.sh index be2d43b9b9..104d5111fd 100755 --- a/test/spec.sh +++ b/test/spec.sh @@ -692,8 +692,8 @@ ysh-stdlib-testing() { run-file ysh-stdlib-testing "$@" } -ysh-stdlib-synch() { - run-file ysh-stdlib-synch "$@" +ysh-stdlib-sync() { + run-file ysh-stdlib-sync "$@" } ysh-source() {