Skip to content

Commit

Permalink
fix complie
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Sep 15, 2021
1 parent 66b1999 commit 250916f
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 83 deletions.
2 changes: 1 addition & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
sinkManager := sink.NewManager(ctx, s, errCh, checkpointTs)
sinkManager := sink.NewManager(ctx, s, errCh, checkpointTs, captureInfo.AdvertiseAddr, changefeedID)
processor, err := newProcessor(ctx, pdCli, grpcPool, session, info, sinkManager,
changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval)
if err != nil {
Expand Down
82 changes: 0 additions & 82 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,88 +247,6 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
c.Assert(err, check.IsNil)
}

<<<<<<< HEAD
=======
// Run the benchmark
// go test -benchmem -run='^$' -bench '^(BenchmarkManagerFlushing)$' github.com/pingcap/ticdc/cdc/sink
func BenchmarkManagerFlushing(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{}, errCh, 0, "", "")

// Init table sinks.
goroutineNum := 2000
rowNum := 2000
var wg sync.WaitGroup
tableSinks := make([]Sink, goroutineNum)
for i := 0; i < goroutineNum; i++ {
i := i
wg.Add(1)
go func() {
defer wg.Done()
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0)
}()
}
wg.Wait()

// Concurrent emit events.
for i := 0; i < goroutineNum; i++ {
i := i
tableSink := tableSinks[i]
wg.Add(1)
go func() {
defer wg.Done()
for j := 1; j < rowNum; j++ {
err := tableSink.EmitRowChangedEvents(context.Background(), &model.RowChangedEvent{
Table: &model.TableName{TableID: int64(i)},
CommitTs: uint64(j),
})
if err != nil {
b.Error(err)
}
}
}()
}
wg.Wait()

// All tables are flushed concurrently, except table 0.
for i := 1; i < goroutineNum; i++ {
i := i
tableSink := tableSinks[i]
go func() {
for j := 1; j < rowNum; j++ {
if j%2 == 0 {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(j))
if err != nil {
b.Error(err)
}
}
}
}()
}

b.ResetTimer()
// Table 0 flush.
tableSink := tableSinks[0]
for i := 0; i < b.N; i++ {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(rowNum))
if err != nil {
b.Error(err)
}
}
b.StopTimer()

cancel()
_ = manager.Close(ctx)
close(errCh)
for err := range errCh {
if err != nil {
b.Error(err)
}
}
}

>>>>>>> 06a674ba6 (metrics: add data flow metrics (#2763))
type errorSink struct {
*check.C
}
Expand Down

0 comments on commit 250916f

Please sign in to comment.