From cb45e306a39af30419147e391f9544f2194f4cc7 Mon Sep 17 00:00:00 2001 From: Daniel Hobbs Date: Wed, 10 Nov 2021 11:37:57 -0700 Subject: [PATCH 1/5] allow user provided metadata to override defaults --- src/grpcbox_client_stream.erl | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/grpcbox_client_stream.erl b/src/grpcbox_client_stream.erl index f1d7bf8..1be6cfb 100644 --- a/src/grpcbox_client_stream.erl +++ b/src/grpcbox_client_stream.erl @@ -13,15 +13,15 @@ -include("grpcbox.hrl"). --define(headers(Scheme, Host, Path, Encoding, MessageType, MD), [{<<":method">>, <<"POST">>}, - {<<":path">>, Path}, - {<<":scheme">>, Scheme}, - {<<":authority">>, Host}, - {<<"grpc-encoding">>, Encoding}, - {<<"grpc-message-type">>, MessageType}, - {<<"content-type">>, <<"application/grpc+proto">>}, - {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, - {<<"te">>, <<"trailers">>} | MD]). +-define(default_headers(Scheme, Host, Path, Encoding, MessageType), [{<<":method">>, <<"POST">>}, + {<<":path">>, Path}, + {<<":scheme">>, Scheme}, + {<<":authority">>, Host}, + {<<"grpc-encoding">>, Encoding}, + {<<"grpc-message-type">>, MessageType}, + {<<"content-type">>, <<"application/grpc+proto">>}, + {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, + {<<"te">>, <<"trailers">>}]). new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, message_type=MessageType, @@ -33,8 +33,8 @@ new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, encoding := DefaultEncoding, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), - RequestHeaders = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), - MessageType, metadata_headers(Ctx)), + RequestHeaders = merge_headers(?default_headers(Scheme, Authority, Path, encoding_to_binary(Encoding), + MessageType), metadata_headers(Ctx)), case h2_connection:new_stream(Conn, ?MODULE, [#{service => Service, marshal_fun => MarshalFun, unmarshal_fun => UnMarshalFun, @@ -70,7 +70,8 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), Body = grpcbox_frame:encode(Encoding, MarshalFun(Input)), - Headers = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), MessageType, metadata_headers(Ctx)), + Headers = merge_headers(?default_headers(Scheme, Authority, Path, encoding_to_binary(Encoding), + MessageType), metadata_headers(Ctx)), %% headers are sent in the same request as creating a new stream to ensure %% concurrent calls can't end up interleaving the sending of headers in such @@ -223,3 +224,8 @@ encoding_to_binary(deflate) -> <<"deflate">>; encoding_to_binary(snappy) -> <<"snappy">>; encoding_to_binary(Custom) -> atom_to_binary(Custom, latin1). +merge_headers(Defaults, MD) -> + Map1 = proplists:to_map(Defaults), + Map2 = proplists:to_map(MD), + Merged = maps:merge(Map1, Map2), + proplists:from_map(Merged). From 6daa2b258e495dfec291ec5457d5862d8f5d3bbe Mon Sep 17 00:00:00 2001 From: Daniel Hobbs Date: Thu, 11 Nov 2021 11:04:26 -0700 Subject: [PATCH 2/5] prepend user metadata to headers --- src/grpcbox_client_stream.erl | 57 ++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/src/grpcbox_client_stream.erl b/src/grpcbox_client_stream.erl index 1be6cfb..fc5ea0f 100644 --- a/src/grpcbox_client_stream.erl +++ b/src/grpcbox_client_stream.erl @@ -13,15 +13,16 @@ -include("grpcbox.hrl"). --define(default_headers(Scheme, Host, Path, Encoding, MessageType), [{<<":method">>, <<"POST">>}, - {<<":path">>, Path}, - {<<":scheme">>, Scheme}, - {<<":authority">>, Host}, - {<<"grpc-encoding">>, Encoding}, - {<<"grpc-message-type">>, MessageType}, - {<<"content-type">>, <<"application/grpc+proto">>}, - {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, - {<<"te">>, <<"trailers">>}]). +-define(headers(Scheme, Host, Path, Encoding, MessageType, MD), MD ++ + [{<<":method">>, <<"POST">>}, + {<<":path">>, Path}, + {<<":scheme">>, Scheme}, + {<<":authority">>, Host}, + {<<"grpc-encoding">>, Encoding}, + {<<"grpc-message-type">>, MessageType}, + {<<"content-type">>, <<"application/grpc+proto">>}, + {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, + {<<"te">>, <<"trailers">>}]). new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, message_type=MessageType, @@ -33,8 +34,8 @@ new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, encoding := DefaultEncoding, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), - RequestHeaders = merge_headers(?default_headers(Scheme, Authority, Path, encoding_to_binary(Encoding), - MessageType), metadata_headers(Ctx)), + RequestHeaders = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), + MessageType, metadata_headers(Ctx)), case h2_connection:new_stream(Conn, ?MODULE, [#{service => Service, marshal_fun => MarshalFun, unmarshal_fun => UnMarshalFun, @@ -70,8 +71,8 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), Body = grpcbox_frame:encode(Encoding, MarshalFun(Input)), - Headers = merge_headers(?default_headers(Scheme, Authority, Path, encoding_to_binary(Encoding), - MessageType), metadata_headers(Ctx)), + Headers = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), + MessageType, metadata_headers(Ctx)), %% headers are sent in the same request as creating a new stream to ensure %% concurrent calls can't end up interleaving the sending of headers in such @@ -224,8 +225,28 @@ encoding_to_binary(deflate) -> <<"deflate">>; encoding_to_binary(snappy) -> <<"snappy">>; encoding_to_binary(Custom) -> atom_to_binary(Custom, latin1). -merge_headers(Defaults, MD) -> - Map1 = proplists:to_map(Defaults), - Map2 = proplists:to_map(MD), - Merged = maps:merge(Map1, Map2), - proplists:from_map(Merged). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +headers_test() -> + {Scheme, Host, Path, Encoding, MsgType} = {<<"http">>, <<"localhost:9898">>, <<"/grpc.Test/Test">>, <<"identity">>, <<"grpc.TestRequest">>}, + Ctx = ctx:new(), + Ctx1 = grpcbox_metadata:append_to_outgoing_ctx(Ctx, #{<<"x-custom-header">> => <<"the-first-shall-be-first">>, + <<"content-type">> => <<"application/grpc">>}), + Headers = ?headers(Scheme, Host, Path, Encoding, MsgType, metadata_headers(Ctx1)), + + ?assertEqual([{<<"content-type">>, <<"application/grpc">>}, + {<<"x-custom-header">>, <<"the-first-shall-be-first">>}, + {<<":method">>, <<"POST">>}, + {<<":path">>, <<"/grpc.Test/Test">>}, + {<<":scheme">>, <<"http">>}, + {<<":authority">>, <<"localhost:9898">>}, + {<<"grpc-encoding">>, <<"identity">>}, + {<<"grpc-message-type">>, <<"grpc.TestRequest">>}, + {<<"content-type">>, <<"application/grpc+proto">>}, + {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, + {<<"te">>, <<"trailers">>} + ], Headers), + ok. + +-endif. From 7d09180d597fe84afdb54397c5bdfb6c8a9ecce1 Mon Sep 17 00:00:00 2001 From: Daniel Hobbs Date: Fri, 12 Nov 2021 10:51:08 -0700 Subject: [PATCH 3/5] * join duplicate headers unless they are protected --- src/grpcbox_client_stream.erl | 49 ++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/src/grpcbox_client_stream.erl b/src/grpcbox_client_stream.erl index fc5ea0f..c95bdbb 100644 --- a/src/grpcbox_client_stream.erl +++ b/src/grpcbox_client_stream.erl @@ -13,6 +13,7 @@ -include("grpcbox.hrl"). +-define(protected_headers, [<<":method">>, <<":path">>, <<":scheme">>, <<":authority">>, <<"content-type">>, <<"te">>]). -define(headers(Scheme, Host, Path, Encoding, MessageType, MD), MD ++ [{<<":method">>, <<"POST">>}, {<<":path">>, Path}, @@ -34,8 +35,8 @@ new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, encoding := DefaultEncoding, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), - RequestHeaders = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), - MessageType, metadata_headers(Ctx)), + RequestHeaders = merge_headers(?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), + MessageType, metadata_headers(Ctx))), case h2_connection:new_stream(Conn, ?MODULE, [#{service => Service, marshal_fun => MarshalFun, unmarshal_fun => UnMarshalFun, @@ -71,8 +72,8 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), Body = grpcbox_frame:encode(Encoding, MarshalFun(Input)), - Headers = ?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), - MessageType, metadata_headers(Ctx)), + Headers = merge_headers(?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), + MessageType, metadata_headers(Ctx))), %% headers are sent in the same request as creating a new stream to ensure %% concurrent calls can't end up interleaving the sending of headers in such @@ -225,28 +226,52 @@ encoding_to_binary(deflate) -> <<"deflate">>; encoding_to_binary(snappy) -> <<"snappy">>; encoding_to_binary(Custom) -> atom_to_binary(Custom, latin1). +merge_headers(Headers) -> + lists:foldl(fun merge_header_field/2, [], Headers). + +merge_header_field({K, V}, HeadersAcc) -> + case {is_protected_header(K), proplists:is_defined(K, HeadersAcc)} of + {true, true} -> + % is protected and already exists, skip + HeadersAcc; + {false, true} -> + % isn't protected and already exists, join + join_header_values({K, V}, HeadersAcc); + {_, false} -> + % doesn't exist, add + [{K, V} | HeadersAcc] + end. + +join_header_values({Name, Val}, HeadersAcc) -> + OrigVal = proplists:get_value(Name, HeadersAcc), + NewValue = <>/binary, Val/binary>>, + NewList = lists:keyreplace(Name, 1, HeadersAcc, {Name, NewValue}), + NewList. + +is_protected_header(Name) -> + lists:member(Name, ?protected_headers). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -headers_test() -> +merge_headers_test() -> {Scheme, Host, Path, Encoding, MsgType} = {<<"http">>, <<"localhost:9898">>, <<"/grpc.Test/Test">>, <<"identity">>, <<"grpc.TestRequest">>}, Ctx = ctx:new(), - Ctx1 = grpcbox_metadata:append_to_outgoing_ctx(Ctx, #{<<"x-custom-header">> => <<"the-first-shall-be-first">>, - <<"content-type">> => <<"application/grpc">>}), - Headers = ?headers(Scheme, Host, Path, Encoding, MsgType, metadata_headers(Ctx1)), + Ctx1 = grpcbox_metadata:append_to_outgoing_ctx(Ctx, #{<<"content-type">> => <<"application/grpc">>, + <<"user-agent">> => <<"custom-grpc-client">>}), + Headers0 = ?headers(Scheme, Host, Path, Encoding, MsgType, metadata_headers(Ctx1)), + Headers = merge_headers(Headers0), ?assertEqual([{<<"content-type">>, <<"application/grpc">>}, - {<<"x-custom-header">>, <<"the-first-shall-be-first">>}, + {<<"user-agent">>, <<"custom-grpc-client, grpc-erlang/0.9.2">>}, {<<":method">>, <<"POST">>}, {<<":path">>, <<"/grpc.Test/Test">>}, {<<":scheme">>, <<"http">>}, {<<":authority">>, <<"localhost:9898">>}, {<<"grpc-encoding">>, <<"identity">>}, {<<"grpc-message-type">>, <<"grpc.TestRequest">>}, - {<<"content-type">>, <<"application/grpc+proto">>}, - {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, {<<"te">>, <<"trailers">>} - ], Headers), + ], lists:reverse(Headers)), ok. -endif. From 6496cda0edd7db4e9c23633ed250a03312ee9afd Mon Sep 17 00:00:00 2001 From: Daniel Hobbs Date: Fri, 12 Nov 2021 13:11:54 -0700 Subject: [PATCH 4/5] pseudoheaders must be first in the headers list --- src/grpcbox_client_stream.erl | 64 ++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/src/grpcbox_client_stream.erl b/src/grpcbox_client_stream.erl index c95bdbb..fa7a605 100644 --- a/src/grpcbox_client_stream.erl +++ b/src/grpcbox_client_stream.erl @@ -13,17 +13,16 @@ -include("grpcbox.hrl"). --define(protected_headers, [<<":method">>, <<":path">>, <<":scheme">>, <<":authority">>, <<"content-type">>, <<"te">>]). --define(headers(Scheme, Host, Path, Encoding, MessageType, MD), MD ++ - [{<<":method">>, <<"POST">>}, - {<<":path">>, Path}, - {<<":scheme">>, Scheme}, - {<<":authority">>, Host}, - {<<"grpc-encoding">>, Encoding}, - {<<"grpc-message-type">>, MessageType}, - {<<"content-type">>, <<"application/grpc+proto">>}, - {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, - {<<"te">>, <<"trailers">>}]). +-define(protected_headers, [<<"content-type">>, <<"te">>]). +-define(pseudoheaders(Path, Scheme, Authority), [{<<":method">>, <<"POST">>}, + {<<":path">>, Path}, + {<<":scheme">>, Scheme}, + {<<":authority">>, Authority}]). +-define(headers(Encoding, MessageType, MD), MD ++ [{<<"grpc-encoding">>, Encoding}, + {<<"grpc-message-type">>, MessageType}, + {<<"content-type">>, <<"application/grpc+proto">>}, + {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, + {<<"te">>, <<"trailers">>}]). new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, message_type=MessageType, @@ -35,8 +34,9 @@ new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, encoding := DefaultEncoding, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), - RequestHeaders = merge_headers(?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), - MessageType, metadata_headers(Ctx))), + UserHeaders = merge_headers(?headers(encoding_to_binary(Encoding), MessageType, metadata_headers(Ctx))), + RequestHeaders = ?pseudoheaders(Path, Scheme, Authority) ++ UserHeaders, + case h2_connection:new_stream(Conn, ?MODULE, [#{service => Service, marshal_fun => MarshalFun, unmarshal_fun => UnMarshalFun, @@ -72,8 +72,8 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service, stats_handler := StatsHandler}} -> Encoding = maps:get(encoding, Options, DefaultEncoding), Body = grpcbox_frame:encode(Encoding, MarshalFun(Input)), - Headers = merge_headers(?headers(Scheme, Authority, Path, encoding_to_binary(Encoding), - MessageType, metadata_headers(Ctx))), + UserHeaders = merge_headers(?headers(encoding_to_binary(Encoding), MessageType, metadata_headers(Ctx))), + RequestHeaders = ?pseudoheaders(Path, Scheme, Authority) ++ UserHeaders, %% headers are sent in the same request as creating a new stream to ensure %% concurrent calls can't end up interleaving the sending of headers in such @@ -86,7 +86,7 @@ send_request(Ctx, Channel, Path, Input, #grpcbox_def{service=Service, buffer => <<>>, stats_handler => StatsHandler, stats => #{}, - client_pid => self()}], Headers, [], self()) of + client_pid => self()}], RequestHeaders, [], self()) of {error, _Code} = Err -> Err; {StreamId, Pid} -> @@ -255,23 +255,33 @@ is_protected_header(Name) -> -include_lib("eunit/include/eunit.hrl"). merge_headers_test() -> - {Scheme, Host, Path, Encoding, MsgType} = {<<"http">>, <<"localhost:9898">>, <<"/grpc.Test/Test">>, <<"identity">>, <<"grpc.TestRequest">>}, + {Encoding, MsgType} = {<<"identity">>, <<"grpc.TestRequest">>}, Ctx = ctx:new(), Ctx1 = grpcbox_metadata:append_to_outgoing_ctx(Ctx, #{<<"content-type">> => <<"application/grpc">>, <<"user-agent">> => <<"custom-grpc-client">>}), - Headers0 = ?headers(Scheme, Host, Path, Encoding, MsgType, metadata_headers(Ctx1)), - Headers = merge_headers(Headers0), + Headers0 = ?headers(Encoding, MsgType, metadata_headers(Ctx1)), + Headers1 = merge_headers(Headers0), - ?assertEqual([{<<"content-type">>, <<"application/grpc">>}, - {<<"user-agent">>, <<"custom-grpc-client, grpc-erlang/0.9.2">>}, - {<<":method">>, <<"POST">>}, - {<<":path">>, <<"/grpc.Test/Test">>}, - {<<":scheme">>, <<"http">>}, - {<<":authority">>, <<"localhost:9898">>}, + ?assertEqual([{<<"te">>, <<"trailers">>}, + {<<"grpc-message-type">>, <<"grpc.TestRequest">>}, {<<"grpc-encoding">>, <<"identity">>}, + {<<"user-agent">>, <<"custom-grpc-client, grpc-erlang/0.9.2">>}, + {<<"content-type">>, <<"application/grpc">>} + ], Headers1), + ok. + +merge_headers_empty_ctx_test() -> + {Encoding, MsgType} = {<<"identity">>, <<"grpc.TestRequest">>}, + Ctx = ctx:new(), + Headers0 = ?headers(Encoding, MsgType, metadata_headers(Ctx)), + Headers1 = merge_headers(Headers0), + + ?assertEqual([{<<"te">>, <<"trailers">>}, + {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, + {<<"content-type">>, <<"application/grpc+proto">>}, {<<"grpc-message-type">>, <<"grpc.TestRequest">>}, - {<<"te">>, <<"trailers">>} - ], lists:reverse(Headers)), + {<<"grpc-encoding">>, <<"identity">>} + ], Headers1), ok. -endif. From 3c2ec9e6eb4f088407499798add913778a001a15 Mon Sep 17 00:00:00 2001 From: Daniel Hobbs Date: Mon, 15 Nov 2021 10:25:46 -0700 Subject: [PATCH 5/5] wrap headers/3 in unit to avoid ++ operator precedence rules funniness --- src/grpcbox_client_stream.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/grpcbox_client_stream.erl b/src/grpcbox_client_stream.erl index fa7a605..faf5b75 100644 --- a/src/grpcbox_client_stream.erl +++ b/src/grpcbox_client_stream.erl @@ -18,11 +18,11 @@ {<<":path">>, Path}, {<<":scheme">>, Scheme}, {<<":authority">>, Authority}]). --define(headers(Encoding, MessageType, MD), MD ++ [{<<"grpc-encoding">>, Encoding}, +-define(headers(Encoding, MessageType, MD), (MD ++ [{<<"grpc-encoding">>, Encoding}, {<<"grpc-message-type">>, MessageType}, {<<"content-type">>, <<"application/grpc+proto">>}, {<<"user-agent">>, <<"grpc-erlang/0.9.2">>}, - {<<"te">>, <<"trailers">>}]). + {<<"te">>, <<"trailers">>}])). new_stream(Ctx, Channel, Path, Def=#grpcbox_def{service=Service, message_type=MessageType,