From 162f5da1a6b936998b59066fec0e02fe5c28fd20 Mon Sep 17 00:00:00 2001 From: "igor.khasilev" Date: Sun, 22 Sep 2019 23:55:58 +0300 Subject: [PATCH] many fixes and impromenets --- source/hio/socket/package.d | 417 +++++++++++++++++++++++++++++++----- 1 file changed, 358 insertions(+), 59 deletions(-) diff --git a/source/hio/socket/package.d b/source/hio/socket/package.d index 2eb6883..47d3767 100644 --- a/source/hio/socket/package.d +++ b/source/hio/socket/package.d @@ -27,6 +27,7 @@ import core.sys.posix.unistd; import core.sys.posix.arpa.inet; import core.sys.posix.netinet.tcp; import core.sys.posix.netinet.in_; +import core.sys.posix.sys.time : timeval; import core.sys.posix.fcntl; @@ -73,6 +74,12 @@ class ConnectionRefused : Exception { } } +class Timeout : Exception { + this(string msg, string file = __FILE__, size_t line = __LINE__) @safe { + super(msg, file, line); + } +} + bool isLinux() pure nothrow @nogc @safe { version(linux) { return true; @@ -166,6 +173,14 @@ class hlSocket : FileEventHandler { return _errno; } + void blocking(bool blocking) @property { + auto flags = () @trusted {return fcntl(_fileno, F_GETFL, 0);}(); + if ( blocking ) { + (() @trusted => fcntl(_fileno, F_SETFL, flags & ~O_NONBLOCK))(); + } else { + (() @trusted => fcntl(_fileno, F_SETFL, flags | O_NONBLOCK))(); + } + } void timeoutHandler(AppEvent e) @safe { debug { @@ -187,7 +202,15 @@ class hlSocket : FileEventHandler { _callback(e); return; case State.ACCEPTING: - assert(0); + debug tracef("accept timed out"); + _connected = false; + _errno = ETIMEDOUT; + _polling = AppEvent.NONE; + _loop.stopPoll(_fileno, AppEvent.IN); + _loop.detach(_fileno); + _state = State.IDLE; + _accept_callback(-1); + return; case State.IO: assert(0); } @@ -205,7 +228,7 @@ class hlSocket : FileEventHandler { if ( e & AppEvent.OUT ) { _connected = true; } - if ( (e & AppEvent.HUP) || isLinux ) { + if ( (e & AppEvent.HUP) ) { int err; uint err_s = err.sizeof; auto rc = (() @trusted => .getsockopt(_fileno, SOL_SOCKET, SO_ERROR, &err, &err_s))(); @@ -360,6 +383,94 @@ class hlSocket : FileEventHandler { loop.stopPoll(_fileno, _polling); } + private auto getSndRcvTimeouts() @safe { + + timeval sndtmo, rcvtmo; + socklen_t tmolen = sndtmo.sizeof; + auto rc = () @trusted { + return .getsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, &tmolen); + }(); + enforce(rc==0, "Failed to get sndtmo"); + debug tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo); + rc = () @trusted { + return .getsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, &tmolen); + }(); + enforce(rc == 0, "Failed to get rcvtmo"); + debug tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo); + return Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo")(sndtmo, rcvtmo); + } + + private void setSndRcvTimeouts(Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo") timeouts) @safe { + + timeval sndtmo = timeouts.sndtimeo, rcvtmo = timeouts.rcvtimeo; + socklen_t tmolen = sndtmo.sizeof; + + auto rc = () @trusted { + return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, tmolen); + }(); + debug tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo); + rc = () @trusted { + return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, tmolen); + }(); + debug tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo); + } + + private void setSndRcvTimeouts(Duration timeout) @safe { + timeval ntmo; + socklen_t stmo; + auto vals = timeout.split!("seconds", "usecs")(); + ntmo.tv_sec = cast(typeof(timeval.tv_sec)) vals.seconds; + ntmo.tv_usec = cast(typeof(timeval.tv_usec)) vals.usecs; + auto rc = () @trusted { + return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &ntmo, ntmo.sizeof); + }(); + debug tracef("got setsockopt sndtimeo: %d: %s", rc, ntmo); + rc = () @trusted { + return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &ntmo, ntmo.sizeof); + }(); + debug tracef("got setsockopt rcvtimeo: %d: %s", rc, ntmo); + } + + /// + /// connect synchronously (no loop, no fibers) + /// in blocked mode + /// + public bool connect(string addr, Duration timeout) @safe { + switch (_af) { + case AF_INET: { + import core.sys.posix.netinet.in_; + import core.sys.posix.sys.time: timeval; + // addr must be "host:port" + auto internet_addr = str2inetaddr(addr); + + // save old timeout values and set new + auto old_timeouts = getSndRcvTimeouts(); + setSndRcvTimeouts(timeout); + + sockaddr_in sin; + sin.sin_family = _af; + sin.sin_port = internet_addr[1]; + sin.sin_addr = in_addr(internet_addr[0]); + uint sa_len = sin.sizeof; + auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))(); + auto connerrno = errno(); + + // restore timeouts + setSndRcvTimeouts(old_timeouts); + if (rc == -1 ) { + debug tracef("connect errno: %s %s", s_strerror(connerrno), sin); + _connected = false; + _state = State.IDLE; + return false; + } + _state = State.IDLE; + _connected = true; + return true; + } + default: + throw new SocketException("unsupported address family"); + } + } /// /// Return true if connect delayed /// @@ -378,7 +489,7 @@ class hlSocket : FileEventHandler { uint sa_len = sin.sizeof; auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))(); if ( rc == -1 && errno() != EINPROGRESS ) { - debug infof("connect errno: %s", s_strerror(errno())); + debug tracef("connect errno: %s", s_strerror(errno())); _connected = false; _state = State.IDLE; f(AppEvent.ERR|AppEvent.IMMED); @@ -408,7 +519,7 @@ class hlSocket : FileEventHandler { uint sa_len = sockaddr.sizeof; auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)sin, sa_len))(); if (rc == -1 && errno() != EINPROGRESS) { - debug infof("connect errno: %s", s_strerror(errno())); + debug tracef("connect errno: %s", s_strerror(errno())); _connected = false; _state = State.IDLE; f(AppEvent.ERR | AppEvent.IMMED); @@ -429,12 +540,14 @@ class hlSocket : FileEventHandler { return true; } - public void accept(T)(hlEvLoop loop, T f) { + public void accept(T)(hlEvLoop loop, Duration timeout, T f) { _loop = loop; _accept_callback = f; // if ( _state != State.ACCEPTING ) { _state = State.ACCEPTING; _polling |= AppEvent.IN; + _connect_timer = new Timer(timeout, &timeoutHandler); + _loop.startTimer(_connect_timer); loop.startPoll(_fileno, AppEvent.IN, this); // } } @@ -520,7 +633,8 @@ class hlSocket : FileEventHandler { } auto rc = (() @trusted => .send(_fileno, &_iorq.output[0], _iorq.output.length, flags))(); if ( rc < 0 ) { - // error sending + // error sending XXX + assert(0); } _iorq.output = _iorq.output[rc..$]; if ( _iorq.output.length == 0 ) { @@ -542,6 +656,86 @@ class hlSocket : FileEventHandler { } } + /// + /// Make blocking IO without evelnt loop. + /// Can be called from non-fiber context + /// + /// return IOResult + /// + auto io(in IORequest iorq, in Duration timeout) @safe { + IOResult result; + + version (linux) { + immutable uint flags = MSG_NOSIGNAL; + } else { + immutable uint flags = 0; + + } + + auto old_timeouts = getSndRcvTimeouts(); + setSndRcvTimeouts(timeout); + + scope(exit) { + setSndRcvTimeouts(old_timeouts); + } + + debug tracef("Blocked io request %s", iorq); + + // handle requested output + result.output = iorq.output; + while(result.output.length > 0) { + auto rc = () @trusted { + return .send(_fileno, cast(void*)result.output.ptr, result.output.length, flags); + }(); + + if ( rc > 0 ) { + result.output = result.output[rc..$]; + } else { + result.error = true; + return result; + } + } + // handle requested input + size_t to_read = iorq.to_read; + if ( to_read > 0 ) { + ubyte[] buffer = new ubyte[](to_read); + size_t ptr, l; + + while(to_read>0) { + auto rc = () @trusted { + return .recv(_fileno, cast(void*)&buffer[ptr], to_read, 0); + }(); + debug tracef("got %d bytes to: %s", rc, buffer); + if ( rc == 0 || (rc > 0 && iorq.allowPartialInput) ) { + // client closed connection + l += rc; + buffer.length = l; + result.input = () @trusted {return assumeUnique(buffer);}(); + debug tracef("Blocked io returned %s", result); + return result; + } + if ( rc < 0 ) { + buffer.length = l; + result.error = true; + result.input = () @trusted { return assumeUnique(buffer); }(); + debug tracef("Blocked io returned %s", result); + return result; + } + to_read -= rc; + ptr += rc; + l += rc; + } + buffer.length = l; + result.input = () @trusted { return assumeUnique(buffer); }(); + debug tracef("Blocked io returned %s", result); + return result; + } + debug tracef("Blocked io returned %s", result); + return result; + } + /// + /// Make unblocked IO using loop + /// auto io(hlEvLoop loop, in IORequest iorq, in Duration timeout) @safe { AppEvent ev = AppEvent.NONE; @@ -612,7 +806,6 @@ class hlSocket : FileEventHandler { } rc = 0; // like we didn't sent anything } - enforce!SocketException(rc > 0, "send must not return 0"); data = data[rc..$]; result.output = data; if ( result.output.empty ) { @@ -808,6 +1001,17 @@ class HioSocket void connect(string addr, Duration timeout) @trusted { auto loop = getDefaultLoop(); _fiber = Fiber.getThis(); + if ( _fiber is null ) { + // we are not in context of any task, connect synchronously + // 1. set blocking mode, socket timeout + // 2. call connect, throw if faied + // 3. set unblocking mode + // 4. return + _socket.blocking = true; + _socket.connect(addr, timeout); + _socket.blocking = false; + return; + } void callback(AppEvent e) { if ( !(e & AppEvent.IMMED) ) { // we called yield @@ -818,11 +1022,11 @@ class HioSocket Fiber.yield(); } if ( _socket._errno == ECONNREFUSED ) { - throw new ConnectionRefused("Unabled to connect socket: connection refused on " ~ addr); + throw new ConnectionRefused("Unable to connect socket: connection refused on " ~ addr); } } /// - bool connected() @safe { + bool connected() const @safe { return _socket.connected; } /// @@ -841,14 +1045,19 @@ class HioSocket } } /// - auto accept() { + auto accept(Duration timeout = Duration.max) { HioSocket s; auto loop = getDefaultLoop(); _fiber = Fiber.getThis(); void callback(int fileno) @trusted { - assert(fileno >= 0); + debug tracef("Got %d on accept", fileno); + if ( fileno < 0 ) { + s = null; + _fiber.call(); + return; + } debug tracef("got accept callback for socket %d", fileno); if ( _socket._polling & AppEvent.IN ) { getDefaultLoop.stopPoll(_socket.fileno, AppEvent.IN); @@ -859,16 +1068,24 @@ class HioSocket _fiber.call(); } _socket._accepts_in_a_row = 1; - _socket.accept(loop, &callback); + _socket.accept(loop, timeout, &callback); Fiber.yield(); return s; } /// - IOResult recv(size_t n, Duration timeout = 10.seconds) { + IOResult recv(size_t n, Duration timeout = 10.seconds) @trusted { IORequest ioreq; IOResult iores; _fiber = Fiber.getThis(); + if ( _fiber is null) { + // read not in context of any fiber. Blocked read. + _socket.blocking = true; + ioreq.to_read = n; + iores = _socket.io(ioreq, timeout); + _socket.blocking = false; + return iores; + } void callback(IOResult ior) @trusted { debug tracef("got ior on recv: %s", ior); iores = ior; @@ -884,10 +1101,22 @@ class HioSocket return iores; } /// - size_t send(immutable (ubyte)[] data, Duration timeout = 1.seconds) { + size_t send(immutable (ubyte)[] data, Duration timeout = 1.seconds) @trusted { _fiber = Fiber.getThis(); IOResult ioresult; + if ( _fiber is null ) { + IORequest ioreq; + _socket.blocking = true; + ioreq.output = data; + ioresult = _socket.io(ioreq, timeout); + _socket.blocking = false; + if ( ioresult.error ) { + return -1; + } + return 0; + } + void callback(IOResult ior) @trusted { ioresult = ior; _fiber.call(); @@ -910,68 +1139,138 @@ class HioSocket return InputStream(this, t); } } + +unittest { + import core.thread; + import hio.scheduler; + + globalLogLevel = LogLevel.info; + void server(ushort port) { + auto s = new HioSocket(); + s.bind("127.0.0.1:%s".format(port)); + s.listen(); + auto c = s.accept(2.seconds); + if ( c is null ) { + s.close(); + throw new Exception("Accept failed"); + } + auto io = c.recv(64, 1.seconds); + assert(io.input == "hello".representation); + c.send("world".representation); + c.close(); + s.close(); + } + void client(ushort port) { + auto s = new HioSocket(); + s.connect("127.0.0.1:%d".format(port), 1.seconds); + scope(exit) { + s.close(); + } + if ( s.connected ) { + auto rq = s.send("hello".representation, 1.seconds); + auto rs = s.recv(64, 1.seconds); + assert(rs.input == "world".representation); + } else { + throw new Exception("Can't connect to server"); + } + } + + info("Test HioSockets 0"); + + // all ok case + auto t = new Thread({ + try{ + App(&server, cast(ushort)12345); + } catch (Exception e) { + infof("Got %s in server", e); + } + }).start; + Thread.sleep(500.msecs); + client(12345); + t.join; + + info("Test HioSockets 1"); + // all fail case - everything should throw + t = new Thread({ + App(&server, cast(ushort) 12345); + }).start; + assertThrown!Exception(client(12346)); + assertThrown!Exception(t.join); + + info("Test HioSockets 2"); + // the same but client in App + // all ok case + t = new Thread({ + App(&server, cast(ushort) 12345); + }).start; + Thread.sleep(500.msecs); + App(&client, cast(ushort)12345); + t.join; + + info("Test HioSockets 3"); + // all fail case - everything should throw + t = new Thread({ App(&server, cast(ushort) 12345); }).start; + assertThrown!Exception(App(&client, cast(ushort)12346)); + assertThrown!Exception(t.join); +} + /// struct ByLineSplitter(R) { + import nbuff; private { - R source; - ubyte[] data; - bool started; + enum NL = "\n".representation; + R source; + Buffer buff; + long last_position; + string line; } /// this(R r) { source = r; + popFront; } - - private void fill() { - if (started) { - source.popFront; - } - while (!source.empty) { - data ~= source.front; - if (data.countUntil('\n') >= 0) { - break; - } - source.popFront; - } - } - /// bool empty() { - return source.empty && data.length == 0; + return line is null && source.empty && buff.empty; } - /// string front() { - - auto i = countUntil(data, '\n'); - if (data.length == 0 || i < 0) { - fill(); - i = countUntil(data, '\n'); - } - started = true; - ubyte[] r; - if (i >= 0) { - r = data[0 .. i + 1]; - } - else { - r = data; - } - return assumeUTF(r); + return line; } - /// void popFront() { - auto i = countUntil(data, '\n'); - if (i >= 0) { - data = data[i + 1 .. $]; - return; - } - if (data.length == 0) { - return; + while (true) { + auto p = buff.countUntil(last_position, NL); + if (p >= 0) { + last_position = 0; + if ( p == 0 ) { + line = ""; + } else { + line = cast(string) buff[0 .. p].data; + } + buff = buff[p + 1 .. $]; + return; + } + last_position = buff.length; + if (source.empty) { + line = cast(string) buff[0 .. $].data; + buff = Buffer(); + return; + } + // fill from source and retry + buff.append(source.front); + source.popFront; } - errorf("Missed new line on <%s>", assumeUTF(data)); - assert(0, "Missed new line on <%s>".format(data)); } - } + /// auto byLineSplitter(R)(R r) { return ByLineSplitter!R(r); } + +unittest { + import std.stdio, std.range; + for(int s=1;s<7;s++) { + auto result = "a\nbb\n\n\nc\n\n".chunks(s) + .array.map!"to!string(a).representation".byLineSplitter; + assert(equal(result, ["a", "bb", "", "", "c", ""])); + } +}