diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a2e6f59..8b284bc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * Fix `after_last(Str, Str)` * `encode_json()` - now supports `{'pretty': 'best-effort'}` hint * Various build improvements +* Fix edge case in `del(Hash, Any)` ### Deprecated * Deprecated `Deep` in favor of `AtPath` diff --git a/lib/autoload/globals/IO.ngs b/lib/autoload/globals/IO.ngs index 3ab12a74..38fdba7e 100644 --- a/lib/autoload/globals/IO.ngs +++ b/lib/autoload/globals/IO.ngs @@ -72,7 +72,7 @@ ns { debug('server', "JsonRpcServer Read") F send_error(code, message, data=null) { - error("JsonRpcServer on_data send_error -- ${code} ${message} ${data}") + error("JsonRpcServer on_data (or spawned thread) send_error -- ${code} ${message} ${data}") ret = { 'jsonrpc': '2.0' 'id': req.get('id') @@ -83,7 +83,6 @@ ns { }.filterv() } hc.fire(events::Write(ret.encode_json() + "\n")) - b.return() } req = {} # allow send_error() to do req.get('id') if we crash in try @@ -91,11 +90,18 @@ ns { data.decode_json() } catch(e:JsonDecodeFail) { send_error(-32700, 'Parse error', e.Hash().filterk(AnyOf('error', 'value', 'position'))) + b.return() } - if (req !~ _VALID_JSON_RPC_REQUEST) send_error(-32600, 'Invalid Request', 'See https://www.jsonrpc.org/specification') + if (req !~ _VALID_JSON_RPC_REQUEST) { + send_error(-32600, 'Invalid Request', 'See https://www.jsonrpc.org/specification') + b.return() + } - if (req.method not in jrs.methods) send_error(-32601, 'Method not found', "Method ${req.method} not found. Available methods: ${jrs.methods.keys()}") + if (req.method not in jrs.methods) { + send_error(-32601, 'Method not found', "Method ${req.method} not found. Available methods: ${jrs.methods.keys()}") + b.return() + } args = [] kwargs = {} @@ -106,26 +112,34 @@ ns { } } - debug('server', "JsonRpcServer on_data -- invoking ${req.method}") - result = try { - jrs.methods[req.method](*args, **kwargs) - } catch(mnf:MethodNotFound) { - guard mnf.callable === jrs.methods[req.method] - send_error(-32602, 'Invalid params', "Correct parameters can be seen in methods' descriptions: ${jrs.methods[req.method].Arr()}") - } catch(e:Error) { - send_error(-32603, 'Internal error', e.Str()) - } - - if 'id' in req { - # response expected - ret = { - 'jsonrpc': '2.0' - 'id': req.id - 'result': result - }.filterv() # Why filterv()? - debug('server', 'JsonRpcServer on_data -- sending reply') - hc.fire(events::Write(ret.encode_json() + "\n")) - } + debug('server', "JsonRpcServer on_data -- invoking ${req.method} in a thread") + + # TODO: join this thread + Thread(SeqId("JsonRpcServer-request-"), { + block b { + result = try { + jrs.methods[req.method](*args, **kwargs) + } catch(mnf:MethodNotFound) { + guard mnf.callable === jrs.methods[req.method] + send_error(-32602, 'Invalid params', "Correct parameters can be seen in methods' descriptions: ${jrs.methods[req.method].Arr()}") + b.return() + } catch(e:Error) { + send_error(-32603, 'Internal error', e.Str()) + b.return() + } + + if 'id' in req { + # response expected + ret = { + 'jsonrpc': '2.0' + 'id': req.id + 'result': result + }.filterv() # Why filterv()? + debug('server', 'JsonRpcServer on_data -- sending reply') + hc.fire(events::Write(ret.encode_json() + "\n")) + } + } + }) } # handle Read @@ -139,9 +153,16 @@ ns { F init(jrc:JsonRpcClient) { jrc.callbacks = {} jrc.id = Iter(1..null) + jrc.pipeline = null debug('client', 'JsonRpcClient()') } + F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, a:events::Active) { + debug('client', 'JsonRpcClient Active') + jrc.pipeline = hc.pipeline + hc.fire(a) + } + F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, r:events::Read) block b { data = r.data debug('client', "JsonRpcClient Read") @@ -149,7 +170,7 @@ ns { F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, w:events::Write) block b { data = w.data - debug('client', "JsonRpcClient Write") + debug('client', "JsonRpcClient Write ${w.data.SafeStr()}") id = w.data.get('id', { jrc.id.next() }) w2 = events::Write({ 'jsonrpc': '2.0' @@ -166,7 +187,7 @@ ns { # TODO: make this more resilient F handle(jrc:JsonRpcClient, hc:COR::HandlerContext, r:events::Read) block b { data = r.data - debug('client', "JsonRpcClient Read ${data}") + debug('client', "JsonRpcClient Read ${data.SafeStr()}") res = try { data.decode_json() @@ -184,6 +205,40 @@ ns { ) } + # Is this the right way to do sync and async? + # TODO: what async should return? + global call + F call(jrc:JsonRpcClient, method:Str, params=[], callback:Fun=null) { + sync = not(callback) + + lock = Lock() + result = null + if sync { + lock.acquire() + callback = F json_rpc_client_sync_callback(data) { + # Runs in the reader thread + debug('client', 'JsonRpcClient callback') + result = data + lock.release() + } + } + + assert(jrc.pipeline, 'Before using JsonRpcClient#call, call fire(COR::Pipeline, IO::events::Active()) on the pipeline where JsonRpcClient were pushed') + jrc.pipeline.fire(events::Write({ + 'call': { + 'method': method + 'params': params + } + 'callback': callback + })) + + not(sync) returns + + debug('client', 'JsonRpcClient blocked on waiting for result') + lock.acquire().release() + debug('client', 'JsonRpcClient got result') + result + } } diff --git a/lib/autoload/globals/SeqId.ngs b/lib/autoload/globals/SeqId.ngs new file mode 100644 index 00000000..d6da0f40 --- /dev/null +++ b/lib/autoload/globals/SeqId.ngs @@ -0,0 +1,15 @@ +ns { + # TODO: more fine grained locking or lock-less mechanism + l = Lock() + ids = {} + + global SeqId + doc %STATUS - Experimental + F SeqId(s:Str) { + i = l.acquire({ + ids.dflt(s, 0) + ids[s] += 1 + }) + "${s}${i}" + } +} \ No newline at end of file diff --git a/lib/autoload/globals/Timeline.ngs b/lib/autoload/globals/Timeline.ngs index bed17f6b..6ea861df 100644 --- a/lib/autoload/globals/Timeline.ngs +++ b/lib/autoload/globals/Timeline.ngs @@ -2,7 +2,61 @@ ns { # WIP - timeline, mainly for UI - global Timeline, Time, Lines, init, each, map, sort, push, echo_cli + global Time, Lines, init, each, map, sort, push, echo_cli + + section "Item" { + + doc time - Time + type Item + + F init(ti:Item) throw InvalidArgument("Item() was called without arguments") + + F init(ti:Item, time:Time=null) { + ti.id = "ti-${`line: uuidgen`}" # temporary + ti.time = time + } + + F Time(ti:Item) ti.time + } + + section "GroupItem" { + + type GroupItem(Item) + + F init(ti:GroupItem) { + super(ti) + ti.items = [] + } + + F push(gti:GroupItem, ti:Item) gti::{ + if not(gti.items) { + gti.time = ti.time + } + gti.items.push(ti) + } + } + + section "ResultItem" { + + type ResultItem(Item) + + F init(ti:ResultItem, time:Time, result) { + super(ti, time) + ti.result = result + } + } + + section "TextualCommandItem" { + + type TextualCommandItem(Item) + + F init(ti:TextualCommandItem) throw InvalidArgument("TextualCommandItem() was called without arguments") + + F init(ti:TextualCommandItem, time:Time, command:Str) { + super(ti, time) + ti.command = command + } + } section "Timeline" { @@ -16,6 +70,7 @@ ns { F init(t:Timeline) { t.id = "tl-${`line: uuidgen`}" # temporary + t.version = 1 # TODO: make sure it's thread safe t.name = '(unnamed)' t.status = 'TODO' t.error = null @@ -32,11 +87,12 @@ ns { t.sort() } - F push(t:Timeline, x:Timeline) t::{ - A.items.push(x) + F push(t:Timeline, ti:Item) t::{ + A.items.push(ti) A.sort() # TODO: insert in the correct place, keeping .items sorted by time # TODO: maximize time_last_update and time_end into t # TODO: minimize time_start into t + t.version += 1 } F each(t:Timeline, cb:Fun) t::{A.items.each(cb)} diff --git a/lib/autoload/globals/net.ngs b/lib/autoload/globals/net.ngs index 42095bb4..a2677e1c 100644 --- a/lib/autoload/globals/net.ngs +++ b/lib/autoload/globals/net.ngs @@ -83,7 +83,7 @@ ns { global handle F handle(cpd:ConnectionPipelineDelegate, e:IO::events::Write) { - debug('server', "ConnectionPipelineDelegate Write") + debug('server', "ConnectionPipelineDelegate Write ${e.data.SafeStr()}") cpd.connection.send(e.data) } F handle(cpd:ConnectionPipelineDelegate, e:IO::events::Active) { @@ -155,7 +155,7 @@ ns { debug('server', 'ThreadedServerDelegate on_connect()') # TODO: join the thread / use Executor - Thread("connection-${c.id}", { + Thread("ThreadedServerDelegate-Connection-${c.id}", { try { debug('server', 'ThreadedServerDelegate on_listen() - new thread') c.pipeline.fire(IO::events::Active()) @@ -173,7 +173,7 @@ ns { } } catch(e) { debug('server', "ThreadedServerDelegate before logging") - log("ThreadedServerDelegate - exception in thread: ${e}") + log("ThreadedServerDelegate - exception in thread: ${Str(e) tor ''}") debug('server', "ThreadedServerDelegate after logging") guard false } @@ -254,20 +254,37 @@ ns { } section "UnixStreamClient" { + + block _ { + id = Iter(1..null) + global init + F init(usc:UnixStreamClient) { + super(usc) + usc.id = id.next() + } + } + F connect(usc:UnixStreamClient, path:Str, cd:ClientDelegate) { + debug('client', 'UnixStreamClient#connect') sock = socket(C_PF_UNIX, C_SOCK_STREAM) usc.sock = sock connect(sock, c_sockaddr_un(path)) c = Connection(usc, sock) + debug('client', 'UnixStreamClient will call on_connect') cd.on_connect(c) - while true { - data = recvfrom(sock, 1024, 0, c_sockaddr_un()) - if data == '' { - cd.on_remote_close() - break + usc.reader = Thread("UnixStreamClient-${usc.id}-receiver", { + while true { + debug('client', 'UnixStreamClient will recvfrom()') + data = recvfrom(sock, 1024, 0, c_sockaddr_un()) + debug('client', "UnixStreamClient did recvfrom() ${data.SafeStr()}") + if data == '' { + cd.on_remote_close() + break + } + c.pipeline.fire(IO::events::Read(data)) } - c.pipeline.fire(IO::events::Read(data)) - } + }) + usc } } } diff --git a/lib/autoload/globals/ui.ngs b/lib/autoload/globals/ui.ngs index 4d1bb990..e9e64c84 100644 --- a/lib/autoload/globals/ui.ngs +++ b/lib/autoload/globals/ui.ngs @@ -1,4 +1,4 @@ -ns { +ns(TL=Timeline) { # WIP global init, JsonData @@ -17,6 +17,11 @@ ns { type Element() type Screen(Element) + type Timeline(Element) + type TimelineItem(Element) + type GroupTimelineItem(TimelineItem) + type TextualCommandTimelineItem(TimelineItem) + type ResultTimelineItem(TimelineItem) type Object(Element) type Scalar(Element) @@ -85,6 +90,33 @@ ns { F keys_are_strings(h) h.keys().all(Str) F Element(h:AnyOf(Hash, HashLike)) Properties(h.assert(keys_are_strings, "Element(Hash) - keys must be strings").mapv(Element)) + F init(gti:GroupTimelineItem, id:Str, time:Time, items:Arr) { + gti.id = id + gti.time = time + super(gti, items) + } + + F Element(gti:TL::GroupItem) GroupTimelineItem(gti.id, gti.time, gti.items.map(Element)) + + F init(rti:ResultTimelineItem, id:Str, time:Time, result) { + rti.id = id + rti.time = time + super(rti, [Element(result)]) + } + + F Element(rti:TL::ResultItem) ResultTimelineItem(rti.id, rti.time, rti.result) + + F init(tcti:TextualCommandTimelineItem, id:Str, time:Time, command:Str) init(args()) + + F Element(tcti:TL::TextualCommandItem) TextualCommandTimelineItem(tcti.id, tcti.time, tcti.command) + + F init(tl:Timeline, id:Str, version, items:Arr) { + tl.id = id + tl.version = version + tl.children = items.map(Element) + } + + F Element(tl:TL::Timeline) Timeline(tl.id, tl.version, tl.items) # TODO: Fix later. It's semantically incorrect to display path as just a string F Element(p:Path) Scalar(p.path) diff --git a/lib/lang-tests.ngs b/lib/lang-tests.ngs index 8d9bd8ef..d9042603 100644 --- a/lib/lang-tests.ngs +++ b/lib/lang-tests.ngs @@ -51,6 +51,7 @@ TEST a=[1,2,3,4]; a[4..4]=["END"]; a[2..2]=["MIDDLE"]; a[0..0]=["START"]; a[0..1 # === Hash tests ================================= TEST h={"a": 1}; h.del("a"); h == {} +TEST h={"a": 1}; h.del("a"); h["b"]=2; h == {"b": 2} TEST h={}; try h.del("a") catch(e:KeyNotFound) true TEST h={"a": 1, "b": 2}; h.del("a"); h == {"b": 2} diff --git a/lib/shell.ngs b/lib/shell.ngs index 2ce22e53..5a56b96b 100644 --- a/lib/shell.ngs +++ b/lib/shell.ngs @@ -6,34 +6,104 @@ ns { WEB_SERVER_DIR = Dir(".." / "ngs-web-ui").assert() WEB_SERVER_PORT = 52000 + # Temporary name "repr" + F repr(h:Hash, level:Int=0) collector { + pfx = ' ' * level + h.each(F(k, v) { + if [k, v] =~ ['time', Int] { + v = "${v} (${v.Time().Str()})" + } + if k == 'children' { + v.each_idx_val(F(i, v) { + collect("${pfx} [${i}]") + repr(v, level + 2).each(collect) + }) + return + } + collect("${pfx}${k} = ${v.SafeStr()}") + }) + } + + section "Shell Client" { + type Client() + + + type ShellClientDelegate(net::ClientDelegate) + + global on_connect + F on_connect(scd:ShellClientDelegate, c:net::Connection) { + debug('client', 'ShellClientDelegate#on_connect()') + scd.jrc = IO::handlers::JsonRpcClient() + c.pipeline.push(IO::handlers::Splitter("\n")) + c.pipeline.push(scd.jrc) + c.pipeline.fire(IO::events::Active()) + } + + global init + F init(c:Client) { + c.last_seen_version = null + c.last_eval_result = null + c.scd = ShellClientDelegate() + c.client = net::UnixStreamClient() + c.client.connect(SOCK_FILE, c.scd) + } + + global eval + F eval(c:Client, cmd:Str) { + c.last_eval_result = c.scd.jrc.call('eval', [cmd])::{ + debug('client', {"Eval result ${A.SafeStr()}"}) + } + } + + global poll + F poll(c:Client, cb:Fun) { + while true { + debug('client', "Polling last_seen_version=${c.last_seen_version}") + p = c.scd.jrc.call('poll', { + 'last_seen_version': c.last_seen_version + }) + debug('client', "Poll result - timeline (ver. ${p.version})") + # echo(p.timeline.repr().Lines()) + c.last_seen_version = p.version + not(cb(p)) returns p + } + } + } + doc Rudimentary client as phase 1. - doc Not implemented yet. doc Send a command and poll for results. F eval(cmd:Str, exit:Bool=true) { # TODO: make this method less verbose by adding functionality to libraries type ShellClientDelegate(net::ClientDelegate) scd = ShellClientDelegate() + jrc = IO::handlers::JsonRpcClient() global on_connect F on_connect(scd:ShellClientDelegate, c:net::Connection) { debug('client', 'ShellClientDelegate#on_connect()') c.pipeline.push(IO::handlers::Splitter("\n")) - c.pipeline.push(IO::handlers::JsonRpcClient()) + c.pipeline.push(jrc) c.pipeline.fire(IO::events::Active()) - c.pipeline.fire(IO::events::Write({ - 'call': { - "method": "eval" - "params": [cmd] - } - 'callback': F(response) { - debug('client', "CLIENT RESPONSE ${response}") - } - })) } client = net::UnixStreamClient() client.connect(SOCK_FILE, scd) + + e = jrc.call('eval', [cmd]) + log("Eval result ${e.SafeStr()}") + + last_seen_version = null + while true { + log("Polling last_seen_version=${last_seen_version}") + p = jrc.call('poll', { + 'last_seen_version': last_seen_version + }) + log("Poll result - timeline (ver. ${p.version})") + echo(p.timeline.repr().Lines()) + last_seen_version = p.version + exit returns p + } } doc Starts shell server, JSON RPC proxy, and a browser client @@ -85,10 +155,15 @@ ns { } } - tl = Timeline() + tl = Timeline::Timeline() SHELL_METHODS = ns { F eval(line:Str) { + section "Append line to the Timeline" { + tc = Timeline::TextualCommandItem(Time(), line) + g = Timeline::GroupItem().push(tc) + tl.push(g) + } fname = '' section "Shortcuts" { @@ -102,19 +177,23 @@ ns { } line = shortcuts.get(line, line) } - bytecode = compile(line, fname) - # TODO: pass warnings - #bytecode.meta().warnings.each(F(w) { - # wl = w.location - # warn("${fname}:${wl.first_line}:${wl.first_column} warning: ${w.message}") - #}) - - func = load(bytecode, "") - result = func() - debug('server', "Result type ${result.Type().name}") - result = ui::Element(result) - debug('server', "Result type after transform() ${result.Type().name}") - result.JsonData()._censor() + + # TODO: join these sometimes + Thread(SeqId("shell-eval-"), { + debug('server', "Eval start: ${line}") + bytecode = compile(line, fname) + # TODO: pass warnings + #bytecode.meta().warnings.each(F(w) { + # wl = w.location + # warn("${fname}:${wl.first_line}:${wl.first_column} warning: ${w.message}") + #}) + func = load(bytecode, "") + result = func() + debug('server', "Eval end: ${line}") + g.push(Timeline::ResultItem(Time(), result)) + tl.version += 1 # FIXME: Flimsy + }) + {'ti': tc.id} # Timeline Item ID } # event example: {cur={type=AWS::CodePipeline::Pipeline, id=XXXX}, ref={type=AWS::CodePipeline::Execution, id=XXXX}} @@ -129,8 +208,23 @@ ns { } } - F poll() { - log("poll() - not implemented yet") + # TODO: handle broken pipe + F poll(last_seen_version) { + debug('client', "poll(last_seen_version=${last_seen_version}) - work in progress - returning the whole Timeline") + section "Wait for later version before retuning the result" block b { + not(last_seen_version) returns + while true { + if tl.version > last_seen_version { + b.return() + } + $(sleep 0.1) + } + } + result = { + 'timeline': ui::Element(tl) + 'version': tl.version + } + result.JsonData()._censor() } section "tests" { diff --git a/ngsfile b/ngsfile index 48017a45..3b7a460d 100755 --- a/ngsfile +++ b/ngsfile @@ -1,14 +1,93 @@ #!/usr/bin/env ngs -ns { - F server() { - require("./lib/shell.ngs")::server(false) - } +ns(t=test) { - F ui() { - require("./lib/shell.ngs")::server(true) - } + sh = require("./lib/shell.ngs") + + F server() sh::server(false) + F ui() sh::server(true) + + # "eval" would conflict with the global eval(c:Client, ...) + # which is called from tests below. + F _eval(cmd:Str, exit:Bool=true) sh::eval(cmd, exit) + _exports.eval = _eval + + F test() { + # Tests where the server and the client run in the same process + # fail sporadically. Therefore, running the server in external process. + # srv = Thread("test-server", { + # log("Starting server") + # server() + # }) + srv = $(ngs . server &) + $(sleep 1) + c = sh::Client() + + F make_textual_command_timeline_result_pattern(ti:Str, cmd:Str, pat) { + { + 'timeline': { + 'children': Present({ + '$type': 'GroupTimelineItem' + 'children': [{ + '$type': 'TextualCommandTimelineItem' + 'id': ti + 'command': cmd + }, { + '$type': 'ResultTimelineItem' + 'children': [ + pat + ] + }] + }) + } + } + } + + test_ti = null + t("basic eval", { + test_ti = c.eval("{1+1}").assert({ + 'ti': Pfx('ti-') + }).ti + }) + + t("poll after basic eval", { + c.poll(F(response) { + # The response was observed to be already there. I don't think it's guaranteed. May need refactoring. + assert(response, make_textual_command_timeline_result_pattern(test_ti, '{1+1}', { '$type': 'Scalar', 'value': 2 })) + false + }) + }) + + t("eval with sleep", { + # Valid JSON for decode_json() + test_ti = c.eval("sleep 2 | echo 3").assert({ + 'ti': Pfx('ti-') + }).ti + }) + + t("poll immediately after eval with sleep", { + c.poll(F(response) { + assert(response, { + 'timeline': { + 'children': Present({ + '$type': 'GroupTimelineItem' + 'children': [{ + '$type': 'TextualCommandTimelineItem' + 'id': test_ti + 'command': 'sleep 2 | echo 3' + }] + }) + } + }) + false + }) + }) - F eval(cmd:Str) { - require("./lib/shell.ngs")::eval(cmd) + t("poll later after eval with sleep", { + $(sleep 2) + c.poll(F(response) { + assert(response, make_textual_command_timeline_result_pattern(test_ti, 'sleep 2 | echo 3', { '$type': 'Scalar', 'value': 3 })) + false + }) + }) } } \ No newline at end of file diff --git a/obj.c b/obj.c index 4f907fb6..cea7ddb0 100644 --- a/obj.c +++ b/obj.c @@ -512,6 +512,7 @@ void resize_hash_for_new_len(VALUE h, RESIZE_HASH_AFTER after) { if(!new_buckets_n) { OBJ_DATA_PTR(h) = NULL; + HASH_BUCKETS_N(h) = new_buckets_n; return; }