Skip to content

Commit

Permalink
Merge pull request #102 from novalabsxyz/adt/chatterbox-update
Browse files Browse the repository at this point in the history
Update grpcbox for chatterbox performance work
  • Loading branch information
tsloughter authored Jun 14, 2023
2 parents 855947e + 2ef767c commit da0ab58
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 37 deletions.
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{erl_opts, [debug_info]}.

{deps, [{chatterbox, {pkg, ts_chatterbox}},
{deps, [{chatterbox, ".*", {git, "https://github.com/tsloughter/chatterbox", {branch, "master"}}},
ctx,
acceptor_pool,
gproc]}.
Expand Down
19 changes: 10 additions & 9 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
{"1.2.0",
[{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},0},
{<<"chatterbox">>,{pkg,<<"ts_chatterbox">>,<<"0.13.0">>},0},
{<<"chatterbox">>,
{git,"https://github.com/tsloughter/chatterbox",
{ref,"2c595c14c4b378d1a3c01f11a1a3e001fd8834c1"}},
0},
{<<"ctx">>,{pkg,<<"ctx">>,<<"0.6.0">>},0},
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.8.0">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},1}]}.
{<<"gproc">>,{pkg,<<"gproc">>,<<"0.9.1">>},0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},1}]}.
[
{pkg_hash,[
{<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>},
{<<"chatterbox">>, <<"6F059D97BCAA758B8EA6FFFE2B3B81362BD06B639D3EA2BB088335511D691EBF">>},
{<<"ctx">>, <<"8FF88B70E6400C4DF90142E7F130625B82086077A45364A78D208ED3ED53C7FE">>},
{<<"gproc">>, <<"CEA02C578589C61E5341FCE149EA36CCEF236CC2ECAC8691FBA408E7EA77EC2F">>},
{<<"hpack">>, <<"17670F83FF984AE6CD74B1C456EDDE906D27FF013740EE4D9EFAA4F1BF999633">>}]},
{<<"gproc">>, <<"F1DF0364423539CF0B80E8201C8B1839E229E5F9B3CCB944C5834626998F5B8C">>},
{<<"hpack">>, <<"2461899CC4AB6A0EF8E970C1661C5FC6A52D3C25580BC6DD204F84CE94669926">>}]},
{pkg_hash_ext,[
{<<"acceptor_pool">>, <<"0CBCD83FDC8B9AD2EEE2067EF8B91A14858A5883CB7CD800E6FCD5803E158788">>},
{<<"chatterbox">>, <<"B93D19104D86AF0B3F2566C4CBA2A57D2E06D103728246BA1AC6C3C0FF010AA7">>},
{<<"ctx">>, <<"A14ED2D1B67723DBEBBE423B28D7615EB0BDCBA6FF28F2D1F1B0A7E1D4AA5FC2">>},
{<<"gproc">>, <<"580ADAFA56463B75263EF5A5DF4C86AF321F68694E7786CB057FD805D1E2A7DE">>},
{<<"hpack">>, <<"06F580167C4B8B8A6429040DF36CC93BBA6D571FAEAEC1B28816523379CBB23A">>}]}
{<<"gproc">>, <<"905088E32E72127ED9466F0BAC0D8E65704CA5E73EE5A62CB073C3117916D507">>},
{<<"hpack">>, <<"D6137D7079169D8C485C6962DFE261AF5B9EF60FBC557344511C1E65E3D95FB0">>}]}
].
3 changes: 1 addition & 2 deletions src/grpcbox_client_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service,
buffer => <<>>,
stats_handler => StatsHandler,
stats => #{},
client_pid => self()}], Headers, [], self()) of
client_pid => self()}], Headers, Body, [], self()) of
{error, _Code} = Err ->
Err;
{StreamId, Pid} ->
h2_connection:send_body(Conn, StreamId, Body),
{ok, Conn, StreamId, Pid}
end;
{error, _}=Error ->
Expand Down
18 changes: 9 additions & 9 deletions src/grpcbox_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
full_method :: binary() | undefined,
input_ref :: reference() | undefined,
callback_pid :: pid() | undefined,
connection_pid :: pid(),
connection :: h2_stream_set:stream_set(),
request_encoding :: gzip | identity | undefined,
response_encoding :: gzip | identity | undefined,
content_type :: proto | json | undefined,
Expand Down Expand Up @@ -83,10 +83,10 @@
}.
-type grpc_extended_error_response() :: {grpc_extended_error, grpc_error_data()}.

init(ConnPid, StreamId, [Socket, ServicesTable, AuthFun, UnaryInterceptor,
init(Conn, StreamId, [Socket, ServicesTable, AuthFun, UnaryInterceptor,
StreamInterceptor, StatsHandler]) ->
process_flag(trap_exit, true),
State = #state{connection_pid=ConnPid,
State = #state{connection=Conn,
stream_id=StreamId,
services_table=ServicesTable,
buffer = <<>>,
Expand Down Expand Up @@ -337,12 +337,12 @@ end_stream(Status, Message, State=#state{headers_sent=false}) ->
end_stream(Status, Message, send_headers(State));
end_stream(_Status, _Message, State=#state{trailers_sent=true}) ->
{ok, State};
end_stream(Status, Message, State=#state{connection_pid=ConnPid,
end_stream(Status, Message, State=#state{connection=Conn,
stream_id=StreamId,
ctx=Ctx,
resp_trailers=Trailers}) ->
EncodedTrailers = grpcbox_utils:encode_headers(Trailers),
h2_connection:send_trailers(ConnPid, StreamId, [{<<"grpc-status">>, Status},
h2_connection:send_trailers(Conn, StreamId, [{<<"grpc-status">>, Status},
{<<"grpc-message">>, Message} | EncodedTrailers],
[{send_end_stream, true}]),
Ctx1 = ctx:with_value(Ctx, grpc_server_status, grpcbox_utils:status_to_string(Status)),
Expand All @@ -362,12 +362,12 @@ send_headers(Ctx, Headers) when is_map(Headers) ->

send_headers(_Metadata, State=#state{headers_sent=true}) ->
State;
send_headers(Metadata, State=#state{connection_pid=ConnPid,
send_headers(Metadata, State=#state{connection=Conn,
stream_id=StreamId,
resp_headers=Headers,
headers_sent=false}) ->
MdHeaders = grpcbox_utils:encode_headers(Metadata),
h2_connection:send_headers(ConnPid, StreamId, Headers ++ MdHeaders, [{send_end_stream, false}]),
h2_connection:send_headers(Conn, StreamId, Headers ++ MdHeaders, [{send_end_stream, false}]),
State#state{headers_sent=true}.

code_to_status(0) -> ?GRPC_STATUS_OK;
Expand Down Expand Up @@ -449,15 +449,15 @@ send(End, Message, State=#state{headers_sent=false}) ->
State1 = send_headers(State),
send(End, Message, State1);
send(End, Message, State=#state{ctx=Ctx,
connection_pid=ConnPid,
connection=Conn,
stream_id=StreamId,
response_encoding=Encoding,
method=#method{proto=Proto,
input={_Input, _},
output={Output, _}}}) ->
BodyToSend = Proto:encode_msg(Message, Output),
OutFrame = grpcbox_frame:encode(Encoding, BodyToSend),
ok = h2_connection:send_body(ConnPid, StreamId, OutFrame, [{send_end_stream, End}]),
ok = h2_connection:send_body(Conn, StreamId, OutFrame, [{send_end_stream, End}]),
stats_handler(Ctx, out_payload, #{uncompressed_size => erlang:external_size(Message),
compressed_size => size(BodyToSend)}, State).

Expand Down
22 changes: 12 additions & 10 deletions src/grpcbox_subchannel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
encoding := grpcbox:encoding(),
stats_handler := module() | undefined
},
conn :: pid() | undefined,
conn :: h2_stream_set:stream_set() | undefined,
conn_pid :: pid() | undefined,
idle_interval :: timer:time()}).

start_link(Name, Channel, Endpoint, Encoding, StatsHandler) ->
Expand Down Expand Up @@ -79,9 +80,9 @@ disconnected(EventType, EventContent, Data) ->

handle_event({call, From}, info, #data{info=Info}) ->
{keep_state_and_data, [{reply, From, Info}]};
handle_event(info, {'EXIT', Pid, _}, Data=#data{conn=Pid}) ->
{next_state, disconnected, Data#data{conn=undefined}};
handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined}) ->
handle_event(info, {'EXIT', Pid, _}, Data=#data{conn_pid=Pid}) ->
{next_state, disconnected, Data#data{conn=undefined, conn_pid=undefined}};
handle_event(info, {'EXIT', _, econnrefused}, #data{conn=undefined, conn_pid=undefined}) ->
keep_state_and_data;
handle_event({call, From}, shutdown, _) ->
{stop_and_reply, normal, {reply, From, ok}};
Expand All @@ -101,7 +102,7 @@ terminate(normal, _State, #data{conn=Pid,
gproc_pool:disconnect_worker(Channel, Endpoint),
gproc_pool:remove_worker(Channel, Endpoint),
ok;
terminate(Reason, _State, #data{conn=Pid,
terminate(Reason, _State, #data{conn_pid=Pid,
endpoint=Endpoint,
channel=Channel}) ->
exit(Pid, Reason),
Expand All @@ -122,14 +123,15 @@ connect(Data=#data{conn=undefined,
stream_callback_mod => grpcbox_client_stream,
connect_timeout => ConnectTimeout,
tcp_user_timeout => TcpUserTimeout}) of
{ok, Pid} ->
{next_state, ready, Data#data{conn=Pid}, Actions};
{ok, Conn} ->
Pid = h2_stream_set:connection(Conn),
{next_state, ready, Data#data{conn=Conn, conn_pid=Pid}, Actions};
{error, _}=Error ->
{next_state, disconnected, Data#data{conn=undefined}, [{reply, From, Error}]}
end;
connect(Data=#data{conn=Pid}, From, Actions) when is_pid(Pid) ->
h2_connection:stop(Pid),
connect(Data#data{conn=undefined}, From, Actions).
connect(Data=#data{conn=Conn, conn_pid=Pid}, From, Actions) when is_pid(Pid) ->
h2_connection:stop(Conn),
connect(Data#data{conn=undefined, conn_pid=undefined}, From, Actions).

options(https, Options) ->
[{client_preferred_next_protocols, {client, [<<"h2">>]}} | Options];
Expand Down
8 changes: 2 additions & 6 deletions test/grpcbox_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -694,18 +694,14 @@ stream_interceptor(_Config) ->
%% verify that the chatterbox stream isn't storing frame data
check_stream_state(S) ->
{_, StreamState} = sys:get_state(maps:get(stream_pid, S)),
FrameQueue = element(6, StreamState),
FrameQueue = element(7, StreamState),
?assert(queue:is_empty(FrameQueue)).

%% return the stream_set of a connection in the channel
connection_stream_set() ->
{ok, {Channel, _}} = grpcbox_channel:pick(default_channel, unary),
{ok, Conn, _} = grpcbox_subchannel:conn(Channel),
{connected, ConnState} = sys:get_state(Conn),

%% I know, I know, this will fail if the connection record in h2_connection ever has elements
%% added before the stream_set field. But for now, it is 14 and that's good enough.
element(14, ConnState).
Conn.

cert_dir(Config) ->
DataDir = ?config(data_dir, Config),
Expand Down

0 comments on commit da0ab58

Please sign in to comment.