Skip to content

Commit

Permalink
dfghjk
Browse files Browse the repository at this point in the history
  • Loading branch information
paperclover committed Sep 4, 2023
1 parent 70c5fba commit 0d625ee
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 54 deletions.
2 changes: 0 additions & 2 deletions packages/bun-usockets/src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include <string.h>
#include <stdint.h>

#include <stdio.h>

/* Shared with SSL */

int us_socket_local_port(int ssl, struct us_socket_t *s) {
Expand Down
103 changes: 60 additions & 43 deletions src/bun.js/api/bun/subprocess.zig
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ pub const Subprocess = struct {

pub fn disconnect(this: *Subprocess) void {
if (this.ipc == .none) return;
this.ipc_socket.close();
this.ipc_socket.close(0, null);
this.ipc = .none;
}

Expand Down Expand Up @@ -1120,7 +1120,6 @@ pub const Subprocess = struct {
var args = args_;
var ipc = IPCMode.none;
var ipc_callback: JSValue = .zero;
_ = ipc_callback;

{
if (args.isEmptyOrUndefinedOrNull()) {
Expand Down Expand Up @@ -1309,6 +1308,7 @@ pub const Subprocess = struct {
if (args.get(globalThis, "onMessage")) |val| {
if (val.isCell() and val.isCallable(globalThis.vm())) {
ipc = .advanced;
ipc_callback = val.withAsyncContextIfNeeded(globalThis);
}
}
}
Expand Down Expand Up @@ -1349,40 +1349,6 @@ pub const Subprocess = struct {
env_array.capacity = env_array.items.len;
}

// IPC is currently implemented in a very limited way.
//
// Node lets you pass as many fds as you want, they all become be sockets; then, IPC is just a special
// runtime-owned version of "pipe" (in which pipe is a misleading name since they're bidirectional sockets).
//
// Bun currently only supports three fds: stdin, stdout, and stderr, which are all unidirectional
//
// And then fd 3 is assigned specifically and only for IPC. This is quite lame, because Node.js allows
// the ipc fd to be any number and it just works. But most people only care about the default `.fork()`
// behavior, where this workaround suffices.
//
// When Bun.spawn() is given a `.onMessage` callback, it enables IPC as follows:
var socket: uws.SocketTCP = undefined;
if (ipc != .none) {
if (comptime is_sync) {
globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{});
return .zero;
}

env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn");
env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_FD=3");
env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_MODE=advanced");

var fds: [2]uws.LIBUS_SOCKET_DESCRIPTOR = undefined;
socket = uws.newSocketFromPair(jsc_vm.rareData().spawnIPCContext(jsc_vm), &fds) orelse {
globalThis.throw("failed to create socket pair: E{s}", .{
@tagName(bun.sys.getErrno(-1)),
});
return .zero;
};

actions.dup2(fds[1], 3) catch |err| return globalThis.handleError(err, "in posix_spawn");
}

const stdin_pipe = if (stdio[0].isPiped()) os.pipe2(0) catch |err| {
globalThis.throw("failed to create stdin pipe: {s}", .{@errorName(err)});
return .zero;
Expand Down Expand Up @@ -1429,6 +1395,43 @@ pub const Subprocess = struct {
};
env = @as(@TypeOf(env), @ptrCast(env_array.items.ptr));

// IPC is currently implemented in a very limited way.
//
// Node lets you pass as many fds as you want, they all become be sockets; then, IPC is just a special
// runtime-owned version of "pipe" (in which pipe is a misleading name since they're bidirectional sockets).
//
// Bun currently only supports three fds: stdin, stdout, and stderr, which are all unidirectional
//
// And then fd 3 is assigned specifically and only for IPC. This is quite lame, because Node.js allows
// the ipc fd to be any number and it just works. But most people only care about the default `.fork()`
// behavior, where this workaround suffices.
//
// When Bun.spawn() is given a `.onMessage` callback, it enables IPC as follows:
var socket: IPCSocket = undefined;
if (ipc != .none) {
if (comptime is_sync) {
globalThis.throwInvalidArguments("IPC is not supported in Bun.spawnSync", .{});
return .zero;
}

env_array.ensureUnusedCapacity(allocator, 2) catch |err| return globalThis.handleError(err, "in posix_spawn");
env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_FD=3");
env_array.appendAssumeCapacity("BUN_INTERNAL_IPC_MODE=advanced");

var fds: [2]uws.LIBUS_SOCKET_DESCRIPTOR = undefined;
socket = uws.newSocketFromPair(
jsc_vm.rareData().spawnIPCContext(jsc_vm),
@sizeOf(*Subprocess),
&fds,
) orelse {
globalThis.throw("failed to create socket pair: E{s}", .{
@tagName(bun.sys.getErrno(-1)),
});
return .zero;
};
actions.dup2(fds[1], 3) catch |err| return globalThis.handleError(err, "in posix_spawn");
}

const pid = brk: {
defer {
if (stdio[0].isPiped()) {
Expand Down Expand Up @@ -1484,7 +1487,6 @@ pub const Subprocess = struct {
globalThis.throw("out of memory", .{});
return .zero;
};

// When run synchronously, subprocess isn't garbage collected
subprocess.* = Subprocess{
.globalThis = globalThis,
Expand All @@ -1500,9 +1502,15 @@ pub const Subprocess = struct {
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.is_sync = is_sync,
.ipc = ipc,
// will be assigned in the block below
.ipc_socket = socket,
.ipc_buffer = bun.ByteList{},
.ipc_callback = if (ipc_callback != .zero) JSC.Strong.create(ipc_callback, globalThis) else undefined,
};
if (ipc != .none) {
var ptr = socket.ext(*Subprocess);
ptr.?.* = subprocess;
}

if (subprocess.stdin == .pipe) {
subprocess.stdin.pipe.signal = JSC.WebCore.Signal.init(&subprocess.stdin);
Expand Down Expand Up @@ -1998,7 +2006,11 @@ pub const Subprocess = struct {
) void {
// log("onData", .{});
std.debug.print("onData: '{}'\n", .{std.fmt.fmtSliceHexLower(buf)});
if (this.ipc_buffer.len == 0 and buf.len >= 2) {
if (this.ipc_buffer.len == 0) {
if (buf.len < 2) {
_ = this.ipc_buffer.write(bun.default_allocator, buf) catch @panic("OOM");
return;
}
// attempt to do everything without buffering
if (buf[0] != 1) {
// incorrect format
Expand All @@ -2010,24 +2022,29 @@ pub const Subprocess = struct {
const len = @as(u32, @intCast(buf[1]));
if (buf.len < len + 2) {
// not enough data
log("Not enough data yet {d}, {d}", .{ buf.len, len + 2 });
this.ipc_buffer.write(bun.default_allocator, buf);
_ = this.ipc_buffer.write(bun.default_allocator, buf) catch @panic("OOM");
return;
}

const message = buf[2 .. 2 + len];
const deserialized = JSValue.deserialize(message);
const off = 1 + @sizeOf(u32);
const message = buf[off .. off + len];
const deserialized = JSValue.deserialize(message, this.globalThis);

if (deserialized != .zero) {
if (this.ipc_callback.get()) |cb| {
log("Calling callback with data", .{});
cb.callWithThis(
const result = cb.callWithThis(
this.globalThis,
this.this_jsvalue,
&[_]JSValue{deserialized},
);
// TODO: this can throw
if (result.isAnyError()) {
this.globalThis.bunVM().onUnhandledError(this.globalThis, result);
}
}
}
return;
}
}
pub fn onWritable(
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/bindings/Serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ extern "C" EncodedJSValue Bun__JSValue__deserialize(JSGlobalObject* globalObject
Vector<uint8_t> vector(bytes, size);
/// ?! did i just give ownership of these bytes to JSC?
auto scriptValue = SerializedScriptValue::createFromWireBytes(WTFMove(vector));
return JSValue::encode(jsUndefined());
return JSValue::encode(scriptValue->deserialize(*globalObject, globalObject));
}
4 changes: 2 additions & 2 deletions src/bun.js/bindings/bindings.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4914,11 +4914,11 @@ pub const JSValue = enum(JSValueReprInt) {
return AsyncContextFrame__withAsyncContextIfNeeded(global, this);
}

extern "c" fn Bun__JSValue__deserialize(global: *JSGlobalObject, data: [*]const u8, len: usize) JSValue;
extern "c" fn Bun__JSValue__deserialize(global: *JSGlobalObject, data: [*]const u8, len: isize) JSValue;

/// Deserializes a JSValue from a serialized buffer. Zig version of `import('bun:jsc').deserialize`
pub inline fn deserialize(bytes: []const u8, global: *JSGlobalObject) JSValue {
return Bun__JSValue__deserialize(global, bytes.ptr, bytes.len);
return Bun__JSValue__deserialize(global, bytes.ptr, @intCast(bytes.len));
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/rare_data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ pub fn spawnIPCContext(rare: *RareData, vm: *JSC.VirtualMachine) *uws.SocketCont

var opts: uws.us_socket_context_options_t = .{};
const ctx = uws.us_create_socket_context(0, vm.event_loop_handle.?, @sizeOf(usize), opts).?;
Subprocess.IPCSocket.configure(ctx, false, Subprocess, Subprocess.IPCHandler);
Subprocess.IPCSocket.configure(ctx, true, *Subprocess, Subprocess.IPCHandler);
rare.spawn_ipc_usockets_context = ctx;
return ctx;
}
Expand Down
4 changes: 2 additions & 2 deletions src/deps/uws.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2264,8 +2264,8 @@ extern fn us_socket_pair(
fds: *[2]LIBUS_SOCKET_DESCRIPTOR,
) ?*Socket;

pub fn newSocketFromPair(ctx: *SocketContext, fds: *[2]LIBUS_SOCKET_DESCRIPTOR) ?SocketTCP {
pub fn newSocketFromPair(ctx: *SocketContext, ext_size: c_int, fds: *[2]LIBUS_SOCKET_DESCRIPTOR) ?SocketTCP {
return SocketTCP{
.socket = us_socket_pair(ctx, 0, fds) orelse return null,
.socket = us_socket_pair(ctx, ext_size, fds) orelse return null,
};
}
4 changes: 4 additions & 0 deletions src/js/node/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,10 @@ class ChildProcess extends EventEmitter {
}
}

#emitIpcMessage(message) {
this.emit("message", message);
}

#send(message, handle, options, callback) {
if (typeof handle === "function") {
callback = handle;
Expand Down
6 changes: 3 additions & 3 deletions src/js/out/InternalModuleRegistryConstants.h

Large diffs are not rendered by default.

0 comments on commit 0d625ee

Please sign in to comment.