Skip to content

Commit

Permalink
websocket: first implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Francis Bouvier <[email protected]>
  • Loading branch information
francisbouvier committed Nov 27, 2024
1 parent 8f7a8c0 commit 325eced
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 120 deletions.
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@
[submodule "vendor/zig-async-io"]
path = vendor/zig-async-io
url = [email protected]:lightpanda-io/zig-async-io.git
[submodule "vendor/websocket.zig"]
path = vendor/websocket.zig
url = [email protected]:lightpanda-io/websocket.zig.git
branch = lightpanda
5 changes: 5 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ fn common(
.root_source_file = b.path("vendor/tls.zig/src/main.zig"),
});
step.root_module.addImport("tls", tlsmod);

const wsmod = b.addModule("ws", .{
.root_source_file = b.path("vendor/websocket.zig/src/websocket.zig"),
});
step.root_module.addImport("websocket", wsmod);
}

fn moduleNetSurf(b: *std.Build, target: std.Build.ResolvedTarget) !*std.Build.Module {
Expand Down
2 changes: 1 addition & 1 deletion src/cdp/cdp.zig
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub fn sendEvent(
const resp = Resp{ .method = name, .params = params, .sessionId = sessionID };

const event_msg = try stringify(alloc, resp);
try server.sendAsync(ctx, event_msg);
try ctx.send(event_msg);
}

// Common
Expand Down
2 changes: 1 addition & 1 deletion src/cdp/page.zig
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ fn navigate(
.loaderId = ctx.state.loaderID,
};
const res = try result(alloc, input.id, Resp, resp, input.sessionId);
try server.sendAsync(ctx, res);
try ctx.send(res);

// TODO: at this point do we need async the following actions to be async?

Expand Down
4 changes: 2 additions & 2 deletions src/cdp/target.zig
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ fn disposeBrowserContext(

// output
const res = try result(alloc, input.id, null, .{}, null);
try server.sendAsync(ctx, res);
try ctx.send(res);

return error.DisposeBrowserContext;
}
Expand Down Expand Up @@ -378,7 +378,7 @@ fn closeTarget(
success: bool = true,
};
const res = try result(alloc, input.id, Resp, Resp{}, null);
try server.sendAsync(ctx, res);
try ctx.send(res);

// Inspector.detached event
const InspectorDetached = struct {
Expand Down
83 changes: 83 additions & 0 deletions src/handler.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (C) 2023-2024 Lightpanda (Selecy SAS)
//
// Francis Bouvier <[email protected]>
// Pierre Tachoire <[email protected]>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

const std = @import("std");

const ws = @import("websocket");

const log = std.log.scoped(.handler);

pub const Stream = struct {
socket: std.posix.socket_t = undefined,
ws_conn: *ws.Conn = undefined,

fn connectCDP(self: *Stream) !void {
const address = try std.net.Address.parseIp("127.0.0.1", 3245);

const flags: u32 = std.posix.SOCK.STREAM;
const proto = std.posix.IPPROTO.TCP;
const socket = try std.posix.socket(address.any.family, flags, proto);

try std.posix.connect(
socket,
&address.any,
address.getOsSockLen(),
);
log.debug("connected to Stream server", .{});
self.socket = socket;
}

fn closeCDP(self: *const Stream) void {
std.posix.close(self.socket);
}

fn start(self: *Stream, ws_conn: *ws.Conn) !void {
try self.connectCDP();
self.ws_conn = ws_conn;
}

pub fn recv(self: *const Stream, data: []const u8) !void {
var pos: usize = 0;
while (pos < data.len) {
const len = try std.posix.write(self.socket, data[pos..]);
pos += len;
}
}

pub fn send(self: *const Stream, data: []const u8) !void {
return self.ws_conn.write(data);
}
};

pub const Handler = struct {
stream: *Stream,

pub fn init(_: ws.Handshake, ws_conn: *ws.Conn, stream: *Stream) !Handler {
try stream.start(ws_conn);
return .{ .stream = stream };
}

pub fn close(self: *Handler) void {
self.stream.closeCDP();
}

pub fn clientMessage(self: *Handler, alloc: std.mem.Allocator, data: []const u8) !void {
const msg = try std.fmt.allocPrint(alloc, "{d}:{s}", .{ data.len, data });
try self.stream.recv(msg);
}
};
150 changes: 41 additions & 109 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

const std = @import("std");
const posix = std.posix;
const builtin = @import("builtin");

const jsruntime = @import("jsruntime");
const websocket = @import("websocket");

const Browser = @import("browser/browser.zig").Browser;
const server = @import("server.zig");
const handler = @import("handler.zig");

const parser = @import("netsurf");
const apiweb = @import("apiweb.zig");
Expand All @@ -32,102 +33,11 @@ pub const Types = jsruntime.reflect(apiweb.Interfaces);
pub const UserContext = apiweb.UserContext;
pub const IO = @import("asyncio").Wrapper(jsruntime.Loop);

const log = std.log.scoped(.cli);

// Inspired by std.net.StreamServer in Zig < 0.12
pub const StreamServer = struct {
/// Copied from `Options` on `init`.
kernel_backlog: u31,
reuse_address: bool,
reuse_port: bool,
nonblocking: bool,

/// `undefined` until `listen` returns successfully.
listen_address: std.net.Address,

sockfd: ?posix.socket_t,

pub const Options = struct {
/// How many connections the kernel will accept on the application's behalf.
/// If more than this many connections pool in the kernel, clients will start
/// seeing "Connection refused".
kernel_backlog: u31 = 128,

/// Enable SO.REUSEADDR on the socket.
reuse_address: bool = false,

/// Enable SO.REUSEPORT on the socket.
reuse_port: bool = false,

/// Non-blocking mode.
nonblocking: bool = false,
};

/// After this call succeeds, resources have been acquired and must
/// be released with `deinit`.
pub fn init(options: Options) StreamServer {
return StreamServer{
.sockfd = null,
.kernel_backlog = options.kernel_backlog,
.reuse_address = options.reuse_address,
.reuse_port = options.reuse_port,
.nonblocking = options.nonblocking,
.listen_address = undefined,
};
}

/// Release all resources. The `StreamServer` memory becomes `undefined`.
pub fn deinit(self: *StreamServer) void {
self.close();
self.* = undefined;
}

fn setSockOpt(fd: posix.socket_t, level: i32, option: u32, value: c_int) !void {
try posix.setsockopt(fd, level, option, &std.mem.toBytes(value));
}

pub fn listen(self: *StreamServer, address: std.net.Address) !void {
const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC;
var use_sock_flags: u32 = sock_flags;
if (self.nonblocking) use_sock_flags |= posix.SOCK.NONBLOCK;
const proto = if (address.any.family == posix.AF.UNIX) @as(u32, 0) else posix.IPPROTO.TCP;

const sockfd = try posix.socket(address.any.family, use_sock_flags, proto);
self.sockfd = sockfd;
errdefer {
posix.close(sockfd);
self.sockfd = null;
}
// Simple blocking websocket connection model
// ie. 1 thread per ws connection without thread pool and epoll/kqueue
pub const websocket_blocking = true;

// socket options
if (self.reuse_address) {
try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
}
if (@hasDecl(posix.SO, "REUSEPORT") and self.reuse_port) {
try setSockOpt(sockfd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1);
}
if (builtin.target.os.tag == .linux) { // posix.TCP not available on MacOS
// WARNING: disable Nagle's alogrithm to avoid latency issues
try setSockOpt(sockfd, posix.IPPROTO.TCP, posix.TCP.NODELAY, 1);
}

var socklen = address.getOsSockLen();
try posix.bind(sockfd, &address.any, socklen);
try posix.listen(sockfd, self.kernel_backlog);
try posix.getsockname(sockfd, &self.listen_address.any, &socklen);
}

/// Stop listening. It is still necessary to call `deinit` after stopping listening.
/// Calling `deinit` will automatically call `close`. It is safe to call `close` when
/// not listening.
pub fn close(self: *StreamServer) void {
if (self.sockfd) |fd| {
posix.close(fd);
self.sockfd = null;
self.listen_address = undefined;
}
}
};
const log = std.log.scoped(.cli);

const usage =
\\usage: {s} [options] [URL]
Expand Down Expand Up @@ -319,27 +229,49 @@ pub fn main() !void {
switch (cli_mode) {
.server => |mode| {

// server
var srv = StreamServer.init(.{
.reuse_address = true,
.reuse_port = true,
.nonblocking = true,
});
defer srv.deinit();

srv.listen(mode.addr) catch |err| {
// Stream server
const socket = server.listen(mode.addr) catch |err| {
log.err("address (host:port) {any}\n", .{err});
return printUsageExit(mode.execname, 1);
};
defer srv.close();
log.info("Server mode: listening on {s}:{d}...", .{ mode.host, mode.port });
defer std.posix.close(socket);
log.debug("Server mode: listening internally on {s}:{d}...", .{ mode.host, mode.port });

var stream = handler.Stream{};

// loop
var loop = try jsruntime.Loop.init(alloc);
defer loop.deinit();

// listen
try server.listen(alloc, &loop, srv.sockfd.?, std.time.ns_per_s * @as(u64, mode.timeout));
// start stream server in separate thread
const cdp_thread = try std.Thread.spawn(
.{ .allocator = alloc },
server.handle,
.{
alloc,
&loop,
socket,
&stream,
std.time.ns_per_s * @as(u64, mode.timeout),
},
);

// Websocket server
var ws = try websocket.Server(handler.Handler).init(alloc, .{
.port = 9222,
.address = "127.0.0.1",
.handshake = .{
.timeout = 3,
.max_size = 1024,
// since we aren't using hanshake.headers
// we can set this to 0 to save a few bytes.
.max_headers = 0,
},
});
defer ws.deinit();

try ws.listen(&stream);
cdp_thread.join();
},

.fetch => |mode| {
Expand Down
5 changes: 4 additions & 1 deletion src/msg.zig
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ pub const MsgBuffer = struct {
}

// copy the current input into MsgBuffer
@memcpy(self.buf[self.pos..new_pos], _input[0..]);
// NOTE: we could use @memcpy but it's not Thread-safe (alias problem)
// see https://www.openmymind.net/Zigs-memcpy-copyForwards-and-copyBackwards/
// Intead we just use std.mem.copyForwards
std.mem.copyForwards(u8, self.buf[self.pos..new_pos], _input[0..]);

// set the new cursor position
self.pos = new_pos;
Expand Down
Loading

0 comments on commit 325eced

Please sign in to comment.