Skip to content

Commit

Permalink
it works!
Browse files Browse the repository at this point in the history
  • Loading branch information
paperclover committed Sep 5, 2023
1 parent 0d625ee commit 9c63352
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 191 deletions.
1 change: 1 addition & 0 deletions packages/bun-usockets/src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ void us_socket_remote_address(int ssl, struct us_socket_t *s, char *buf, int *le

/* Bun extras */
struct us_socket_t *us_socket_pair(struct us_socket_context_t *ctx, int socket_ext_size, LIBUS_SOCKET_DESCRIPTOR* fds);
struct us_socket_t *us_socket_from_fd(struct us_socket_context_t *ctx, int socket_ext_size, LIBUS_SOCKET_DESCRIPTOR fd);
struct us_socket_t *us_socket_detach(int ssl, struct us_socket_t *s);
struct us_socket_t *us_socket_attach(int ssl, LIBUS_SOCKET_DESCRIPTOR client_fd, struct us_socket_context_t *ctx, int flags, int socket_ext_size);
struct us_socket_t *us_socket_wrap_with_tls(int ssl, struct us_socket_t *s, struct us_bun_socket_context_options_t options, struct us_socket_events_t events, int socket_ext_size);
Expand Down
13 changes: 11 additions & 2 deletions packages/bun-usockets/src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,16 @@ struct us_socket_t *us_socket_pair(struct us_socket_context_t *ctx, int socket_e
return 0;
}

return us_socket_from_fd(ctx, socket_ext_size, fds[0]);
}


struct us_socket_t *us_socket_from_fd(struct us_socket_context_t *ctx, int socket_ext_size, LIBUS_SOCKET_DESCRIPTOR fd) {
#ifdef LIBUS_USE_LIBUV
return 0;
#endif
struct us_poll_t *p1 = us_create_poll(ctx->loop, 0, sizeof(struct us_socket_t) + socket_ext_size);
us_poll_init(p1, fds[0], POLL_TYPE_SOCKET);
us_poll_init(p1, fd, POLL_TYPE_SOCKET);
us_poll_start(p1, ctx->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);

struct us_socket_t *s = (struct us_socket_t *) p1;
Expand All @@ -212,7 +220,7 @@ struct us_socket_t *us_socket_pair(struct us_socket_context_t *ctx, int socket_e
s->low_prio_state = 0;

/* We always use nodelay */
bsd_socket_nodelay(fds[0], 1);
bsd_socket_nodelay(fd, 1);

us_internal_socket_context_link_socket(ctx, s);

Expand All @@ -223,6 +231,7 @@ struct us_socket_t *us_socket_pair(struct us_socket_context_t *ctx, int socket_e
return s;
}


/* Not shared with SSL */

void *us_socket_get_native_handle(int ssl, struct us_socket_t *s) {
Expand Down
161 changes: 53 additions & 108 deletions src/bun.js/api/bun/subprocess.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const JSValue = JSC.JSValue;
const JSGlobalObject = JSC.JSGlobalObject;
const Which = @import("../../../which.zig");
const uws = @import("../../../deps/uws.zig");
const IPC = @import("../../ipc.zig");

pub const Subprocess = struct {
const log = Output.scoped(.Subprocess, false);
Expand Down Expand Up @@ -63,16 +64,17 @@ pub const Subprocess = struct {

ipc: IPCMode,
// this is only ever accessed when `ipc` is not `none`
ipc_socket: IPCSocket = undefined,
ipc_socket: IPC.Socket = undefined,
ipc_callback: JSC.Strong = .{},
ipc_buffer: bun.ByteList,

pub const SignalCode = bun.SignalCode;

pub const IPCMode = enum {
none,
///
bun,
// json,
advanced, // "advanced" in node uses v8 serialize, so this wont be compatible
};

pub fn hasExited(this: *const Subprocess) bool {
Expand All @@ -81,7 +83,7 @@ pub const Subprocess = struct {

pub fn updateHasPendingActivityFlag(this: *Subprocess) void {
@fence(.SeqCst);
this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .SeqCst);
this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc == .none, .SeqCst);
}

pub fn hasPendingActivity(this: *Subprocess) callconv(.C) bool {
Expand All @@ -91,7 +93,7 @@ pub const Subprocess = struct {

pub fn updateHasPendingActivity(this: *Subprocess) void {
@fence(.Release);
this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null, .Release);
this.has_pending_activity.store(this.waitpid_err == null and this.exit_code == null and this.ipc == .none, .Release);
}

pub fn ref(this: *Subprocess) void {
Expand Down Expand Up @@ -423,10 +425,6 @@ pub const Subprocess = struct {
return JSC.JSValue.jsUndefined();
}

/// This is used for Bun.spawn() IPC because otherwise we would have to copy the data once to get it to zig, then write it.
/// Returns `true` on success, `false` on failure + throws a JS error.
extern fn Bun__serializeJSValueForSubprocess(global: *JSC.JSGlobalObject, value: JSValue, fd: bun.FileDescriptor) bool;

pub fn doSend(this: *Subprocess, global: *JSC.JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSValue {
if (this.ipc == .none) {
global.throw("Subprocess.send() can only be used if an IPC channel is open.", .{});
Expand All @@ -440,7 +438,7 @@ pub const Subprocess = struct {

const value = callFrame.argument(0);

const success = Bun__serializeJSValueForSubprocess(
const success = IPC.serializeJSValueForSubprocess(
global,
value,
this.ipc_socket.fd(),
Expand Down Expand Up @@ -1118,7 +1116,7 @@ pub const Subprocess = struct {
var cmd_value = JSValue.zero;
var detached = false;
var args = args_;
var ipc = IPCMode.none;
var ipc_mode = IPCMode.none;
var ipc_callback: JSValue = .zero;

{
Expand Down Expand Up @@ -1304,10 +1302,11 @@ pub const Subprocess = struct {
}
}

// TODO: We use this secret flag for IPC. see comment below.
if (args.get(globalThis, "onMessage")) |val| {
if (args.get(globalThis, "ipc")) |val| {
if (val.isCell() and val.isCallable(globalThis.vm())) {
ipc = .advanced;
// In the future, we should add a way to use a different IPC serialization format, specifically `json`.
// but the only use case this has is doing interop with node.js IPC and other programs.
ipc_mode = .bun;
ipc_callback = val.withAsyncContextIfNeeded(globalThis);
}
}
Expand Down Expand Up @@ -1389,12 +1388,6 @@ pub const Subprocess = struct {
return .zero;
};

env_array.append(allocator, null) catch {
globalThis.throw("out of memory", .{});
return .zero;
};
env = @as(@TypeOf(env), @ptrCast(env_array.items.ptr));

// IPC is currently implemented in a very limited way.
//
// Node lets you pass as many fds as you want, they all become be sockets; then, IPC is just a special
Expand All @@ -1407,16 +1400,15 @@ pub const Subprocess = struct {
// behavior, where this workaround suffices.
//
// When Bun.spawn() is given a `.onMessage` callback, it enables IPC as follows:
var socket: IPCSocket = undefined;
if (ipc != .none) {
var socket: IPC.Socket = undefined;
if (ipc_mode != .none) {
if (comptime is_sync) {
globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{});
return .zero;
}

env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn");
env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_FD=3");
env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_MODE=advanced");

var fds: [2]uws.LIBUS_SOCKET_DESCRIPTOR = undefined;
socket = uws.newSocketFromPair(
Expand All @@ -1432,6 +1424,12 @@ pub const Subprocess = struct {
actions.dup2(fds[1], 3) catch |err| return globalThis.handleError(err, "in posix_spawn");
}

env_array.append(allocator, null) catch {
globalThis.throw("out of memory", .{});
return .zero;
};
env = @as(@TypeOf(env), @ptrCast(env_array.items.ptr));

const pid = brk: {
defer {
if (stdio[0].isPiped()) {
Expand Down Expand Up @@ -1501,13 +1499,13 @@ pub const Subprocess = struct {
.stderr = Readable.init(stdio[bun.STDERR_FD], stderr_pipe[0], jsc_vm.allocator, default_max_buffer_size),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.is_sync = is_sync,
.ipc = ipc,
.ipc = ipc_mode,
// will be assigned in the block below
.ipc_socket = socket,
.ipc_buffer = bun.ByteList{},
.ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined,
};
if (ipc != .none) {
if (ipc_mode != .none) {
var ptr = socket.ext(*Subprocess);
ptr.?.* = subprocess;
}
Expand Down Expand Up @@ -1980,91 +1978,38 @@ pub const Subprocess = struct {
return false;
}

pub const IPCSocket = uws.NewSocketHandler(false);

pub const IPCHandler = struct {
// ?! are ALL of these needed tbh
pub fn onOpen(
_: *Subprocess,
_: IPCSocket,
) void {}
pub fn onClose(
this: *Subprocess,
_: IPCSocket,
_: c_int,
_: ?*anyopaque,
) void {
// uSocket is already freed so calling .close() again can segfault
log("onClose", .{});
this.ipc = .none;
std.debug.print("onClose\n", .{});
}
pub fn onData(
this: *Subprocess,
_: IPCSocket,
buf: []const u8,
) void {
// log("onData", .{});
std.debug.print("onData: '{}'\n", .{std.fmt.fmtSliceHexLower(buf)});
if (this.ipc_buffer.len == 0) {
if (buf.len < 2) {
_ = this.ipc_buffer.write(bun.default_allocator, buf) catch @panic("OOM");
return;
}
// attempt to do everything without buffering
if (buf[0] != 1) {
// incorrect format
log("Invalid `type` byte {d}", .{buf[0]});
this.disconnect();
return;
}

const len = @as(u32, @intCast(buf[1]));
if (buf.len < len + 2) {
// not enough data
_ = this.ipc_buffer.write(bun.default_allocator, buf) catch @panic("OOM");
return;
}

const off = 1 + @sizeOf(u32);
const message = buf[off .. off + len];
const deserialized = JSValue.deserialize(message, this.globalThis);

if (deserialized != .zero) {
if (this.ipc_callback.get()) |cb| {
log("Calling callback with data", .{});
const result = cb.callWithThis(
this.globalThis,
this.this_jsvalue,
&[_]JSValue{deserialized},
);
// TODO: this can throw
if (result.isAnyError()) {
this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
}
pub fn handleIPCMessage(
this: *Subprocess,
message: IPC.DecodedIPCMessage,
) void {
switch (message) {
// In future versions we can read this in order to detect version mismatches,
// or disable future optimizations if the subprocess is old.
.version => |v| {
IPC.log("Child IPC version is {d}", .{v});
},
.data => |data| {
IPC.log("Received IPC message from child", .{});
if (this.ipc_callback.get()) |cb| {
const result = cb.callWithThis(
this.globalThis,
this.this_jsvalue,
&[_]JSValue{data},
);
data.ensureStillAlive();
if (result.isAnyError()) {
this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
}
}
return;
}
},
}
pub fn onWritable(
_: *Subprocess,
_: IPCSocket,
) void {}
pub fn onTimeout(
_: *Subprocess,
_: IPCSocket,
) void {}
pub fn onConnectError(
_: *Subprocess,
_: IPCSocket,
_: c_int,
) void {
// I don't think this is possible to hit.
}
pub fn onEnd(
_: *Subprocess,
_: IPCSocket,
) void {}
};
}

pub fn handleIPCClose(this: *Subprocess, _: IPC.Socket) void {
// uSocket is already freed so calling .close() on the socket can segfault
this.ipc = .none;
this.updateHasPendingActivity();
}

pub const IPCHandler = IPC.NewIPCHandler(Subprocess);
};
39 changes: 38 additions & 1 deletion src/bun.js/bindings/Process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ extern "C" uint8_t Bun__getExitCode(void*);
extern "C" uint8_t Bun__setExitCode(void*, uint8_t);
extern "C" void* Bun__getVM();
extern "C" Zig::GlobalObject* Bun__getDefaultGlobal();
extern "C" bool Bun__GlobalObject__hasIPC(JSGlobalObject*);
extern "C" const char* Bun__githubURL;
extern "C" JSC_DECLARE_HOST_FUNCTION(Bun__Process__send);
extern "C" JSC_DECLARE_HOST_FUNCTION(Bun__Process__disconnect);

static void dispatchExitInternal(JSC::JSGlobalObject* globalObject, Process* process, int exitCode)
{
Expand Down Expand Up @@ -279,7 +282,7 @@ JSC_DEFINE_HOST_FUNCTION(Process_functionDlopen,
}
}

JSC::EncodedJSValue (*napi_register_module_v1)(JSC::JSGlobalObject* globalObject,
JSC::EncodedJSValue (*napi_register_module_v1)(JSC::JSGlobalObject * globalObject,
JSC::EncodedJSValue exports);

napi_register_module_v1 = reinterpret_cast<JSC::EncodedJSValue (*)(JSC::JSGlobalObject*,
Expand Down Expand Up @@ -968,6 +971,26 @@ static JSValue constructStdin(VM& vm, JSObject* processObject)
RELEASE_AND_RETURN(scope, result);
}

static JSValue constructProcessSend(VM& vm, JSObject* processObject)
{
auto* globalObject = processObject->globalObject();
if (Bun__GlobalObject__hasIPC(globalObject)) {
return JSC::JSFunction::create(vm, globalObject, 1, String("send"_s), Bun__Process__send, ImplementationVisibility::Public);
} else {
return jsNumber(4);
}
}

static JSValue constructProcessDisconnect(VM& vm, JSObject* processObject)
{
auto* globalObject = processObject->globalObject();
if (Bun__GlobalObject__hasIPC(globalObject)) {
return JSC::JSFunction::create(vm, globalObject, 1, String("disconnect"_s), Bun__Process__disconnect, ImplementationVisibility::Public);
} else {
return jsUndefined();
}
}

static JSValue constructPid(VM& vm, JSObject* processObject)
{
return jsNumber(getpid());
Expand Down Expand Up @@ -1707,6 +1730,18 @@ JSC_DEFINE_HOST_FUNCTION(Process_functionKill,
return JSValue::encode(jsUndefined());
}

extern "C" void Process__emitMessageEvent(Zig::GlobalObject* global, EncodedJSValue value)
{
auto* process = static_cast<Process*>(global->processObject());
auto& vm = global->vm();
auto ident = Identifier::fromString(vm, "message"_s);
if (process->wrapped().hasEventListeners(ident)) {
JSC::MarkedArgumentBuffer args;
args.append(JSValue::decode(value));
process->wrapped().emit(ident, args);
}
}

/* Source for Process.lut.h
@begin processObjectTable
abort Process_functionAbort Function 1
Expand All @@ -1722,6 +1757,7 @@ JSC_DEFINE_HOST_FUNCTION(Process_functionKill,
cpuUsage Process_functionCpuUsage Function 1
cwd Process_functionCwd Function 1
debugPort processDebugPort CustomAccessor
disconnect constructProcessDisconnect PropertyCallback
dlopen Process_functionDlopen Function 1
emitWarning Process_emitWarning Function 1
env constructEnv PropertyCallback
Expand Down Expand Up @@ -1751,6 +1787,7 @@ JSC_DEFINE_HOST_FUNCTION(Process_functionKill,
release constructProcessReleaseObject PropertyCallback
revision constructRevision PropertyCallback
setSourceMapsEnabled Process_stubEmptyFunction Function 1
send constructProcessSend PropertyCallback
stderr constructStderr PropertyCallback
stdin constructStdin PropertyCallback
stdout constructStdout PropertyCallback
Expand Down
Loading

0 comments on commit 9c63352

Please sign in to comment.