Skip to content

Commit

Permalink
loop: Add IO methods
Browse files Browse the repository at this point in the history
Signed-off-by: Francis Bouvier <[email protected]>
  • Loading branch information
francisbouvier committed Nov 21, 2024
1 parent 2c66f41 commit d0e006b
Showing 1 changed file with 74 additions and 53 deletions.
127 changes: 74 additions & 53 deletions src/loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ pub const SingleThreaded = struct {
cbk_error: bool = false,

const Self = @This();
pub const Completion = IO.Completion;

pub const ConnectError = IO.ConnectError;
pub const RecvError = IO.RecvError;
pub const SendError = IO.SendError;

pub fn init(alloc: std.mem.Allocator) !Self {
const io = try alloc.create(IO);
Expand Down Expand Up @@ -97,8 +102,8 @@ pub const SingleThreaded = struct {
self.alloc.destroy(ctx);
}

// Callback-based APIs
// -------------------
// JS callbacks APIs
// -----------------

// Timeout

Expand Down Expand Up @@ -207,65 +212,81 @@ pub const SingleThreaded = struct {
}
}

// Network
pub fn Network(comptime Ctx: type) type {

// TODO check ctx interface funcs:
// - onConnect(ctx: *Ctx, ?anyerror) void
// - onReceive(ctx: *Ctx, usize, ?anyerror) void
// - onSend(ctx: *Ctx, usize, ?anyerror) void
// IO callbacks APIs
// -----------------

return struct {
const NetworkImpl = @This();
const Loop = Self;
// Connect

loop: *Loop,
ctx: *Ctx,
completion: IO.Completion,
pub fn connect(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, _: *Completion, res: ConnectError!void) void,
socket: std.posix.socket_t,
address: std.net.Address,
) void {
const old_events_nb = self.addEvent();
self.io.connect(*Ctx, ctx, cbk, completion, socket, address);
if (builtin.is_test) {
report("start connect {d}", .{old_events_nb + 1});
}
}

pub fn init(loop: *Loop) NetworkImpl {
return .{
.loop = loop,
.completion = undefined,
.ctx = undefined,
};
}
pub fn onConnect(self: *Self, _: ConnectError!void) void {
const old_events_nb = self.removeEvent();
if (builtin.is_test) {
report("connect done, remaining events: {d}", .{old_events_nb - 1});
}
}

pub fn connect(self: *NetworkImpl, ctx: *Ctx, socket: std.posix.socket_t, address: std.net.Address) void {
self.ctx = ctx;
_ = self.loop.addEvent();
self.loop.io.connect(*NetworkImpl, self, NetworkImpl.connectCbk, &self.completion, socket, address);
}
// Send

fn connectCbk(self: *NetworkImpl, _: *IO.Completion, result: IO.ConnectError!void) void {
_ = self.loop.removeEvent();
_ = result catch |err| return self.ctx.onConnect(err);
return self.ctx.onConnect(null);
}
pub fn send(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: SendError!usize) void,
socket: std.posix.socket_t,
buf: []const u8,
) void {
const old_events_nb = self.addEvent();
self.io.send(*Ctx, ctx, cbk, completion, socket, buf);
if (builtin.is_test) {
report("start send {d}", .{old_events_nb + 1});
}
}

pub fn receive(self: *NetworkImpl, ctx: *Ctx, socket: std.posix.socket_t, buffer: []u8) void {
self.ctx = ctx;
_ = self.loop.addEvent();
self.loop.io.recv(*NetworkImpl, self, NetworkImpl.receiveCbk, &self.completion, socket, buffer);
}
pub fn onSend(self: *Self, _: SendError!usize) void {
const old_events_nb = self.removeEvent();
if (builtin.is_test) {
report("send done, remaining events: {d}", .{old_events_nb - 1});
}
}

fn receiveCbk(self: *NetworkImpl, _: *IO.Completion, result: IO.RecvError!usize) void {
_ = self.loop.removeEvent();
const ln = result catch |err| return self.ctx.onReceive(0, err);
return self.ctx.onReceive(ln, null);
}
// Recv

pub fn send(self: *NetworkImpl, ctx: *Ctx, socket: std.posix.socket_t, buffer: []const u8) void {
self.ctx = ctx;
_ = self.loop.addEvent();
self.loop.io.send(*NetworkImpl, self, NetworkImpl.sendCbk, &self.completion, socket, buffer);
}
pub fn recv(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: RecvError!usize) void,
socket: std.posix.socket_t,
buf: []u8,
) void {
const old_events_nb = self.addEvent();
self.io.recv(*Ctx, ctx, cbk, completion, socket, buf);
if (builtin.is_test) {
report("start recv {d}", .{old_events_nb + 1});
}
}

fn sendCbk(self: *NetworkImpl, _: *IO.Completion, result: IO.SendError!usize) void {
_ = self.loop.removeEvent();
const ln = result catch |err| return self.ctx.onSend(0, err);
return self.ctx.onSend(ln, null);
}
};
pub fn onRecv(self: *Self, _: RecvError!usize) void {
const old_events_nb = self.removeEvent();
if (builtin.is_test) {
report("recv done, remaining events: {d}", .{old_events_nb - 1});
}
}
};

0 comments on commit d0e006b

Please sign in to comment.