Skip to content

Commit

Permalink
enable waiting for the first part of data in EncodeHeader (#862)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Feb 26, 2025
1 parent b15f29a commit 4f7b8b7
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
3 changes: 3 additions & 0 deletions api/pkg/filtermanager/api/result_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var (
// WaitAllData controls if the request/response body needs to be fully buffered during processing by Go plugin.
// If this action is returned, DecodeData/EncodeData will be called by DecodeRequest/EncodeResponse.
WaitAllData ResultAction = &isResultAction{typeid: 1}
// WaitData buffers the response header until the first response body is received.
// TODO: currently, we only support using WaitData in the EncodeHeader method.
WaitData ResultAction = &isResultAction{typeid: 2}
// LocalResponse controls if a local reply should be returned from Envoy instead of using the
// upstream response. See comments below for how to use it.
)
Expand Down
13 changes: 13 additions & 0 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type filterManager struct {
reqBuf capi.BufferInstance // don't access it in Encode phases

encodeResponseNeeded bool
encodeWaitFirstData bool
encodeIdx int
rspHdr api.ResponseHeaderMap
rspBuf capi.BufferInstance
Expand Down Expand Up @@ -81,6 +82,7 @@ func (m *filterManager) Reset() {
m.reqBuf = nil

m.encodeResponseNeeded = false
m.encodeWaitFirstData = false
m.encodeIdx = -1
m.rspHdr = nil
m.rspBuf = nil
Expand Down Expand Up @@ -275,6 +277,14 @@ func (m *filterManager) handleAction(res api.ResultAction, phase api.Phase, filt
}
return false
}
if res == api.WaitData {
if phase == api.PhaseEncodeHeaders {
m.encodeWaitFirstData = true
} else {
api.LogErrorf("WaitAllData only allowed when processing response headers, phase: %v.", phase)
}
return false
}

switch v := res.(type) {
case *api.LocalResponse:
Expand Down Expand Up @@ -805,6 +815,9 @@ func (m *filterManager) encodeHeaders(headers capi.ResponseHeaderMap, endStream
}
}

if m.encodeWaitFirstData {
return capi.StopAndBufferWatermark
}
return capi.Continue
}

Expand Down
42 changes: 42 additions & 0 deletions api/pkg/filtermanager/filtermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,45 @@ func TestSyncRunWhenProcessingBufferedDataWithFiltersFromConsumer(t *testing.T)
assert.Equal(t, capi.Running, res)
cb.WaitContinued()
}

func waitDataFactory(_ interface{}, callbacks api.FilterCallbackHandler) api.Filter {
return &waitDataFilter{
cb: callbacks,
}
}

type waitDataFilter struct {
api.PassThroughFilter

cb api.FilterCallbackHandler
}

func (f *waitDataFilter) DecodeHeaders(_ api.RequestHeaderMap, _ bool) api.ResultAction {
return api.WaitData
}

func (f *waitDataFilter) EncodeHeaders(_ api.ResponseHeaderMap, _ bool) api.ResultAction {
return api.WaitData
}

func TestWaitData(t *testing.T) {
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.parsed = []*model.ParsedFilterConfig{
{
Name: "waitData",
Factory: waitDataFactory,
},
}

m := unwrapFilterManager(FilterManagerFactory(config, cb))
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, false)
res := cb.WaitContinued()
assert.Equal(t, capi.Continue, res)
respHdr := envoy.NewResponseHeaderMap(h)
m.EncodeHeaders(respHdr, false)
res = cb.WaitContinued()
assert.Equal(t, capi.StopAndBufferWatermark, res)
}
10 changes: 5 additions & 5 deletions api/plugins/tests/pkg/envoy/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,15 +493,15 @@ type filterCallbackHandler struct {
resp LocalResponse
consumer api.Consumer
pluginState api.PluginState
ch chan struct{}
ch chan capi.StatusType
}

func NewFilterCallbackHandler() *filterCallbackHandler {
return &filterCallbackHandler{
lock: &sync.RWMutex{},
// we create channel with buffer so the goroutine won't leak when we don't call WaitContinued
// manually. When running in Envoy, Envoy won't re-run the filter until Continue is called.
ch: make(chan struct{}, 10),
ch: make(chan capi.StatusType, 10),
streamInfo: &StreamInfo{},
}
}
Expand All @@ -519,11 +519,11 @@ func (i *filterCallbackHandler) SetStreamInfo(data api.StreamInfo) {
}

func (i *filterCallbackHandler) Continue(status capi.StatusType) {
i.ch <- struct{}{}
i.ch <- status
}

func (i *filterCallbackHandler) WaitContinued() {
<-i.ch
func (i *filterCallbackHandler) WaitContinued() capi.StatusType {
return <-i.ch
}

func (i *filterCallbackHandler) SendLocalReply(responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string) {
Expand Down

0 comments on commit 4f7b8b7

Please sign in to comment.