Skip to content

Commit

Permalink
decode user key for filter
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Aug 25, 2023
1 parent 5e65388 commit c305708
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 19 deletions.
13 changes: 9 additions & 4 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,15 @@ func (w *regionWorker) handleEventEntry(
case cdcpb.Event_COMMITTED:
w.metrics.metricPullEventCommittedCounter.Inc()

if w.eventFilter != nil && !w.eventFilter.EventMatch(entry) {
w.metrics.metricFilterOutEventCommittedCounter.Inc()
log.Debug("handleEventEntry: event is filter out and drop", zap.String("OpType", entry.OpType.String()), zap.String("key", string(entry.Key)))
continue
if w.eventFilter != nil {
matched, err := w.eventFilter.EventMatch(entry)
// EventMatch will return error when fail to decode key.
// Pass such entry to be handled by following steps.
if err == nil && !matched {
w.metrics.metricFilterOutEventCommittedCounter.Inc()
log.Debug("handleEventEntry: event is filter out and drop", zap.String("OpType", entry.OpType.String()), zap.String("key", hex.EncodeToString(entry.Key)))
continue
}
}

revent, err := assembleRowEvent(regionID, entry, w.enableOldValue)
Expand Down
27 changes: 19 additions & 8 deletions cdc/pkg/util/kv_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ package util

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"regexp"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"go.uber.org/zap"
"golang.org/x/net/html/charset"
)

Expand Down Expand Up @@ -81,25 +84,33 @@ func CreateFilter(conf *KvFilterConfig) *KvFilter {
}
}

func (f *KvFilter) EventMatch(entry *cdcpb.Event_Row) bool {
// Key of entry is expected to be in RawKV APIv2 format.
// Return error if not.
func (f *KvFilter) EventMatch(entry *cdcpb.Event_Row) (bool, error) {
// Filter on put & delete only.
if entry.GetOpType() != cdcpb.Event_Row_DELETE && entry.GetOpType() != cdcpb.Event_Row_PUT {
return true
return true, nil
}

if len(f.keyPrefix) > 0 && !bytes.HasPrefix(entry.Key, f.keyPrefix) {
return false
userKey, err := DecodeV2Key(entry.Key)
if err != nil {
log.Warn("Unexpected key not in RawKV V2 format", zap.String("entry.Key", hex.EncodeToString(entry.Key)), zap.Error(err))
return false, err
}
if f.keyPattern != nil && !f.keyPattern.MatchString(ConvertToUTF8(entry.Key, "latin1")) {
return false

if len(f.keyPrefix) > 0 && !bytes.HasPrefix(userKey, f.keyPrefix) {
return false, nil
}
if f.keyPattern != nil && !f.keyPattern.MatchString(ConvertToUTF8(userKey, "latin1")) {
return false, nil
}
if entry.GetOpType() == cdcpb.Event_Row_PUT &&
f.valuePattern != nil &&
!f.valuePattern.MatchString(ConvertToUTF8(entry.GetValue(), "latin1")) {
return false
return false, nil
}

return true
return true, nil
}

func ConvertToUTF8(strBytes []byte, origEncoding string) string {
Expand Down
33 changes: 27 additions & 6 deletions cdc/pkg/util/kv_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestKvFilterMatch(t *testing.T) {

entry := cdcpb.Event_Row{
OpType: cdcpb.Event_Row_PUT,
Key: []byte("key\x00\x11pattern"),
Key: []byte("r\x00\x00\x00key\x00\x11pattern"),
Value: []byte("value\xaa\xffpattern"),
}

Expand All @@ -63,7 +63,9 @@ func TestKvFilterMatch(t *testing.T) {
for _, c := range keyPrefixCases {
conf := KvFilterConfig{KeyPrefix: c.pattern}
filter := CreateFilter(&conf)
assert.Equalf(c.match, filter.EventMatch(&entry), "pattern: %s", c.pattern)
matched, err := filter.EventMatch(&entry)
assert.NoError(err)
assert.Equalf(c.match, matched, "pattern: %s", c.pattern)
}

keyPatternCases := []testCase{
Expand All @@ -80,7 +82,9 @@ func TestKvFilterMatch(t *testing.T) {
for _, c := range keyPatternCases {
conf := KvFilterConfig{KeyPattern: c.pattern}
filter := CreateFilter(&conf)
assert.Equalf(c.match, filter.EventMatch(&entry), "pattern: %s", c.pattern)
matched, err := filter.EventMatch(&entry)
assert.NoError(err)
assert.Equalf(c.match, matched, "pattern: %s", c.pattern)
}

valuePatternCases := []testCase{
Expand All @@ -90,14 +94,16 @@ func TestKvFilterMatch(t *testing.T) {
for _, c := range valuePatternCases {
conf := KvFilterConfig{ValuePattern: c.pattern}
filter := CreateFilter(&conf)
assert.Equalf(c.match, filter.EventMatch(&entry), "pattern: %s", c.pattern)
matched, err := filter.EventMatch(&entry)
assert.NoError(err)
assert.Equalf(c.match, matched, "pattern: %s", c.pattern)
}

// delete entry
{
entry := cdcpb.Event_Row{
OpType: cdcpb.Event_Row_DELETE,
Key: []byte("key\x00\x11pattern"),
Key: []byte("r\x00\x00\x00key\x00\x11pattern"),
Value: []byte(""),
}
conf := KvFilterConfig{
Expand All @@ -106,6 +112,21 @@ func TestKvFilterMatch(t *testing.T) {
ValuePattern: `value`,
}
filter := CreateFilter(&conf)
assert.True(filter.EventMatch(&entry))
matched, err := filter.EventMatch(&entry)
assert.NoError(err)
assert.True(matched)
}

// not RawKV V2 key
{
entry := cdcpb.Event_Row{
OpType: cdcpb.Event_Row_DELETE,
Key: []byte("k"),
Value: []byte(""),
}
conf := KvFilterConfig{}
filter := CreateFilter(&conf)
_, err := filter.EventMatch(&entry)
assert.Error(err)
}
}
3 changes: 2 additions & 1 deletion cdc/tests/integration_tests/kv_filter/conf/changefeed.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[filter]
key-pattern = "indexInfo_:_pf01_:_APD0101_:_0{15}[3-9]"
key-prefix = "indexInfo_:_pf01_:"
key-pattern = "_APD0101_:_0{15}[3-9]"

0 comments on commit c305708

Please sign in to comment.