Skip to content

Commit

Permalink
Pipeline stability fixes (#522)
Browse files Browse the repository at this point in the history
* Semaphore (#451)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* use cerrors.New

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* Source Acker Node (#483)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* don't forward acks after a failed ack/nack

* use cerrors

* use cerrors.New

* use LogOrReplace

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* Remove message status dropped (#487)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* remove message status dropped

* document behavior, fanout node return nacked message error

* don't forward acks after a failed ack/nack

* use cerrors

* use cerrors.New

* use LogOrReplace

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* Last position handling (#504)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* remove message status dropped

* document behavior, fanout node return nacked message error

* don't forward acks after a failed ack/nack

* use cerrors

* update plugin interface

* update standalone plugin implementation

* update builtin plugin implementation

* update connector

* update nodes

* change plugin semantics, close stream on teardown

* refactor stream, reuse it in source and destination

* lock stream when stopping

* create control message for source stop

* forward last position to destination

* update connector SDK, fix race condition in source node

* make Conduit in charge of closing connector streams

* Change plugin semantics around teardown - internal connector entity is
  now in charge of closing the stream instead of plugin.
* Map known gRPC errors to internal type (context.Canceled).
* Rewrite DestinationAckerNode to be a regular node staning after
  DestinationNode, receiving messages and triggering ack receiving. This
  makes the structure simpler and in line with all other nodes.
* Create OpenMessagesTracker to simplify tracking open messages in
  SourceNode and DestinationNode.

* destination acker tests

* use cerrors.New

* use LogOrReplace

* use LogOrReplace

* make signal channel buffered

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* make it possible only to inject control messages

* improve destination acker caching test

* remove TODO comment

* update comment
  • Loading branch information
lovromazgon authored Jul 26, 2022
1 parent fecba0b commit 83b97b2
Show file tree
Hide file tree
Showing 44 changed files with 2,157 additions and 1,553 deletions.
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ require (
github.com/conduitio/conduit-connector-generator v0.1.1
github.com/conduitio/conduit-connector-kafka v0.1.1
github.com/conduitio/conduit-connector-postgres v0.1.0
github.com/conduitio/conduit-connector-protocol v0.2.0
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d
github.com/conduitio/conduit-connector-s3 v0.1.1
github.com/conduitio/conduit-connector-sdk v0.2.0
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/dop251/goja v0.0.0-20210225094849-f3cfc97811c0
github.com/gammazero/deque v0.2.0
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.8
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -123,13 +124,14 @@ require (
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
Expand Down
18 changes: 11 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ github.com/conduitio/conduit-connector-kafka v0.1.1 h1:RgN4nafEWjpA4VvXLdSQBNrEO
github.com/conduitio/conduit-connector-kafka v0.1.1/go.mod h1:+CbMUq4fIMxFnrINtjxuVTW5TZYa549WJXQFb63GaIU=
github.com/conduitio/conduit-connector-postgres v0.1.0 h1:Dj2S1NrwnJaUOgQqb9MjGSl2vv2gre0mSFE2Ne/5OSE=
github.com/conduitio/conduit-connector-postgres v0.1.0/go.mod h1:ug4N+2pGKDbG5UN++w7xRqb0A5Ua2J5Ld5wUzLbU1Q0=
github.com/conduitio/conduit-connector-protocol v0.2.0 h1:gwYXVKEMgTtU67ephQ5WwTGIDbT/eTLA9Mdr9Bnbqxc=
github.com/conduitio/conduit-connector-protocol v0.2.0/go.mod h1:udCU2AkLcYQoLjAO06tHVL2iFJPw+DamK+wllnj50hk=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d h1:f3R0yPiH45hDZwNcYMSzKJP6LOGQPELCqW9OkZmd2lA=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d/go.mod h1:1nmTaD+l3mvq3PnMmPPx8UxHPM53Xk8zGT3URu2Xx2M=
github.com/conduitio/conduit-connector-s3 v0.1.1 h1:10uIakNmF65IN5TNJB1qPWC6vbdGgrHEMg8r+dxDrc8=
github.com/conduitio/conduit-connector-s3 v0.1.1/go.mod h1:xpfBzOGjZkkglTmF1444qEjXuEx+do1PTYZNroPFcSE=
github.com/conduitio/conduit-connector-sdk v0.2.0 h1:yReJT3SOAGqJIlk59WC5FPgpv0Gg+NG4NFj6FJ89XnM=
github.com/conduitio/conduit-connector-sdk v0.2.0/go.mod h1:zZ/YJqhIZyXdVmFJS55zqkukpBmB+ohbX2kDduoj8Z0=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435 h1:/bjfGf/vG8vV5WjDb7vcsluVxPZVvfsYRF4nhzJg8q4=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435/go.mod h1:RVVcsR1JBSyN8cxzjBVMyTKDym3KS6MXD2Lons/Wsw4=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down Expand Up @@ -205,6 +205,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA=
github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
github.com/go-chi/chi/v5 v5.0.7/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
Expand Down Expand Up @@ -610,8 +612,9 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -676,6 +679,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
Expand Down Expand Up @@ -805,8 +809,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
12 changes: 8 additions & 4 deletions pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ type Source interface {
// processed and can be acknowledged.
Ack(context.Context, record.Position) error

// Stop signals to the source to stop producing records. Note that after
// this call Read can still produce records that have been cached by the
// connector.
Stop(context.Context) error
// Stop signals to the source to stop producing records. After this call
// Read will produce records until the record with the last position has
// been read (Conduit might have already received that record).
Stop(context.Context) (record.Position, error)
}

// Destination is a connector that can write records to a destination.
Expand All @@ -111,6 +111,10 @@ type Destination interface {
// processed and returns the position of that record. If the record wasn't
// successfully processed the function returns the position and an error.
Ack(context.Context) (record.Position, error)

// Stop signals to the destination that no more records will be produced
// after record with the last position.
Stop(context.Context, record.Position) error
}

// Config collects common data stored for a connector.
Expand Down
46 changes: 37 additions & 9 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/multierror"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/record"
)
Expand Down Expand Up @@ -54,12 +53,17 @@ type destination struct {
// plugin is the running instance of the destination plugin.
plugin plugin.DestinationPlugin

// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// m can lock a destination from concurrent access (e.g. in connector persister).
m sync.Mutex
// wg tracks the number of in flight calls to the plugin.
wg sync.WaitGroup
}

var _ Destination = (*destination)(nil)

func (d *destination) ID() string {
return d.XID
}
Expand Down Expand Up @@ -146,19 +150,41 @@ func (d *destination) Open(ctx context.Context) error {
return err
}

err = dest.Start(ctx)
streamCtx, cancelStreamCtx := context.WithCancel(ctx)
err = dest.Start(streamCtx)
if err != nil {
cancelStreamCtx()
_ = dest.Teardown(ctx)
return err
}

d.logger.Info(ctx).Msg("destination connector plugin successfully started")

d.plugin = dest
d.stopStream = cancelStreamCtx
d.persister.ConnectorStarted()
return nil
}

func (d *destination) Stop(ctx context.Context, lastPosition record.Position) error {
cleanup, err := d.preparePluginCall()
defer cleanup()
if err != nil {
return err
}

d.logger.Debug(ctx).
Bytes(log.RecordPositionField, lastPosition).
Msg("sending stop signal to destination connector plugin")
err = d.plugin.Stop(ctx, lastPosition)
if err != nil {
return cerrors.Errorf("could not stop destination plugin: %w", err)
}

d.logger.Debug(ctx).Msg("destination connector plugin successfully responded to stop signal")
return nil
}

func (d *destination) Teardown(ctx context.Context) error {
// lock destination as we are about to mutate the plugin field
d.m.Lock()
Expand All @@ -167,23 +193,25 @@ func (d *destination) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

d.logger.Debug(ctx).Msg("stopping destination connector plugin")
err := d.plugin.Stop(ctx)
// close stream
if d.stopStream != nil {
d.stopStream()
d.stopStream = nil
}

// wait for any calls to the plugin to stop running first (e.g. Ack or Write)
// wait for any calls to the plugin to stop running first (e.g. Stop, Ack or Write)
d.wg.Wait()

d.logger.Debug(ctx).Msg("tearing down destination connector plugin")
err = multierror.Append(err, d.plugin.Teardown(ctx))

err := d.plugin.Teardown(ctx)
d.plugin = nil
d.persister.ConnectorStopped()

if err != nil {
return cerrors.Errorf("could not tear down plugin: %w", err)
return cerrors.Errorf("could not tear down destination connector plugin: %w", err)
}

d.logger.Info(ctx).Msg("connector plugin successfully torn down")
d.logger.Info(ctx).Msg("destination connector plugin successfully torn down")
return nil
}

Expand Down
21 changes: 18 additions & 3 deletions pkg/connector/mock/connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 25 additions & 12 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ type source struct {
// plugin is the running instance of the source plugin.
plugin plugin.SourcePlugin

// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// m can lock a source from concurrent access (e.g. in connector persister).
m sync.Mutex
// wg tracks the number of in flight calls to the plugin.
wg sync.WaitGroup
}

// not running -> running -> stopping -> not running
var _ Source = (*source)(nil)

func (s *source) ID() string {
return s.XID
Expand Down Expand Up @@ -146,34 +149,39 @@ func (s *source) Open(ctx context.Context) error {
return err
}

err = src.Start(ctx, s.XState.Position)
streamCtx, cancelStreamCtx := context.WithCancel(ctx)
err = src.Start(streamCtx, s.XState.Position)
if err != nil {
cancelStreamCtx()
_ = src.Teardown(ctx)
return err
}

s.logger.Info(ctx).Msg("source connector plugin successfully started")

s.plugin = src
s.stopStream = cancelStreamCtx
s.persister.ConnectorStarted()
return nil
}

func (s *source) Stop(ctx context.Context) error {
func (s *source) Stop(ctx context.Context) (record.Position, error) {
cleanup, err := s.preparePluginCall()
defer cleanup()
if err != nil {
return err
return nil, err
}

s.logger.Debug(ctx).Msg("stopping source connector plugin")
err = s.plugin.Stop(ctx)
s.logger.Debug(ctx).Msg("sending stop signal to source connector plugin")
lastPosition, err := s.plugin.Stop(ctx)
if err != nil {
return cerrors.Errorf("could not stop plugin: %w", err)
return nil, cerrors.Errorf("could not stop source plugin: %w", err)
}

s.logger.Info(ctx).Msg("connector plugin successfully stopped")
return nil
s.logger.Info(ctx).
Bytes(log.RecordPositionField, lastPosition).
Msg("source connector plugin successfully responded to stop signal")
return lastPosition, nil
}

func (s *source) Teardown(ctx context.Context) error {
Expand All @@ -184,21 +192,26 @@ func (s *source) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

// close stream
if s.stopStream != nil {
s.stopStream()
s.stopStream = nil
}

// wait for any calls to the plugin to stop running first (e.g. Stop, Ack or Read)
s.wg.Wait()

s.logger.Debug(ctx).Msg("tearing down source connector plugin")

err := s.plugin.Teardown(ctx)

s.plugin = nil
s.persister.ConnectorStopped()

if err != nil {
return cerrors.Errorf("could not tear down plugin: %w", err)
return cerrors.Errorf("could not tear down source connector plugin: %w", err)
}

s.logger.Info(ctx).Msg("connector plugin successfully torn down")
s.logger.Info(ctx).Msg("source connector plugin successfully torn down")
return nil
}

Expand Down
Loading

0 comments on commit 83b97b2

Please sign in to comment.