Skip to content

Commit

Permalink
Merge pull request #56 from omnicate/grpc-timeout-upstream
Browse files Browse the repository at this point in the history
grpc timeout
  • Loading branch information
tsloughter authored May 4, 2021
2 parents 49d9275 + 10af6e9 commit 0166760
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/grpcbox_client_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ metadata_headers(Ctx) ->
D when D =:= undefined ; D =:= infinity ->
grpcbox_utils:encode_headers(maps:to_list(grpcbox_metadata:from_outgoing_ctx(Ctx)));
{T, _} ->
Timeout = {<<"grpc-timeout">>, <<(integer_to_binary(T - erlang:monotonic_time()))/binary, "S">>},
Timeout = {<<"grpc-timeout">>, <<(integer_to_binary(T - erlang:monotonic_time()))/binary, "n">>},
grpcbox_utils:encode_headers([Timeout | maps:to_list(grpcbox_metadata:from_outgoing_ctx(Ctx))])
end.

Expand Down
18 changes: 11 additions & 7 deletions src/grpcbox_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@ on_receive_headers(Headers, State=#state{ctx=_Ctx}) ->
%% proplists:get_value(<<":method">>, Headers) =:= <<"POST">>,
Metadata = grpcbox_utils:headers_to_metadata(Headers),
Ctx = case parse_options(<<"grpc-timeout">>, Headers) of
infinity ->
grpcbox_metadata:new_incoming_ctx(Metadata);
D ->
ctx:with_deadline_after(grpcbox_metadata:new_incoming_ctx(Metadata), D, nanosecond)
end,
infinity ->
grpcbox_metadata:new_incoming_ctx(Metadata);
D ->
Deadline = max(0, erlang:convert_time_unit(D, nanosecond, millisecond)),
erlang:start_timer(Deadline, self(), <<"grpc-timeout">>),
ctx:with_deadline_after(grpcbox_metadata:new_incoming_ctx(Metadata), D, nanosecond)
end,

FullPath = proplists:get_value(<<":path">>, Headers),
%% wait to rpc_begin here since we need to know the method
Expand Down Expand Up @@ -409,10 +411,12 @@ handle_info({'EXIT', _, {grpc_extended_error, #{status := Status, message := Mes
handle_info({'EXIT', _, _Other}, State) ->
end_stream(?GRPC_STATUS_UNKNOWN, <<"process exited without reason">>, State),
State;
handle_info({timeout,_Ref,<<"grpc-timeout">>}, State) ->
end_stream(?GRPC_STATUS_DEADLINE_EXCEEDED, <<"Deadline expired">>, State),
State;
handle_info(_, State) ->
State.


add_headers(Headers, #state{handler=Pid}) ->
Pid ! {add_headers, Headers}.

Expand Down Expand Up @@ -472,7 +476,7 @@ timeout_to_duration(T, <<"m">>) ->
timeout_to_duration(T, <<"u">>) ->
erlang:convert_time_unit(T, microsecond, nanosecond);
timeout_to_duration(T, <<"n">>) ->
timer:seconds(T).
T.

parse_options(<<"grpc-timeout">>, Headers) ->
case proplists:get_value(<<"grpc-timeout">>, Headers, infinity) of
Expand Down

0 comments on commit 0166760

Please sign in to comment.