Skip to content

Commit

Permalink
Last position handling (#504)
Browse files Browse the repository at this point in the history
* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* remove message status dropped

* document behavior, fanout node return nacked message error

* don't forward acks after a failed ack/nack

* use cerrors

* update plugin interface

* update standalone plugin implementation

* update builtin plugin implementation

* update connector

* update nodes

* change plugin semantics, close stream on teardown

* refactor stream, reuse it in source and destination

* lock stream when stopping

* create control message for source stop

* forward last position to destination

* update connector SDK, fix race condition in source node

* make Conduit in charge of closing connector streams

* Change plugin semantics around teardown - internal connector entity is
  now in charge of closing the stream instead of plugin.
* Map known gRPC errors to internal type (context.Canceled).
* Rewrite DestinationAckerNode to be a regular node staning after
  DestinationNode, receiving messages and triggering ack receiving. This
  makes the structure simpler and in line with all other nodes.
* Create OpenMessagesTracker to simplify tracking open messages in
  SourceNode and DestinationNode.

* destination acker tests

* use cerrors.New

* use LogOrReplace

* use LogOrReplace

* make signal channel buffered

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* make it possible only to inject control messages

* improve destination acker caching test

* remove TODO comment

* update comment
  • Loading branch information
lovromazgon authored Jul 26, 2022
1 parent 1f3e40e commit 44d5f00
Show file tree
Hide file tree
Showing 32 changed files with 1,041 additions and 903 deletions.
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ require (
github.com/conduitio/conduit-connector-generator v0.1.0
github.com/conduitio/conduit-connector-kafka v0.1.1
github.com/conduitio/conduit-connector-postgres v0.1.0
github.com/conduitio/conduit-connector-protocol v0.2.0
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d
github.com/conduitio/conduit-connector-s3 v0.1.1
github.com/conduitio/conduit-connector-sdk v0.2.0
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/dop251/goja v0.0.0-20210225094849-f3cfc97811c0
github.com/golang/mock v1.6.0
Expand All @@ -33,7 +33,7 @@ require (
go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.2
golang.org/x/tools v0.1.11
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad
google.golang.org/genproto v0.0.0-20220630150403-404d0664e509
google.golang.org/grpc v1.47.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
google.golang.org/protobuf v1.28.0
Expand Down Expand Up @@ -73,6 +73,7 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gammazero/deque v0.2.0 // indirect
github.com/go-chi/chi/v5 v5.0.7 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/gofrs/flock v0.8.1 // indirect
Expand Down Expand Up @@ -123,13 +124,14 @@ require (
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
Expand Down
26 changes: 15 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ github.com/conduitio/conduit-connector-kafka v0.1.1 h1:RgN4nafEWjpA4VvXLdSQBNrEO
github.com/conduitio/conduit-connector-kafka v0.1.1/go.mod h1:+CbMUq4fIMxFnrINtjxuVTW5TZYa549WJXQFb63GaIU=
github.com/conduitio/conduit-connector-postgres v0.1.0 h1:Dj2S1NrwnJaUOgQqb9MjGSl2vv2gre0mSFE2Ne/5OSE=
github.com/conduitio/conduit-connector-postgres v0.1.0/go.mod h1:ug4N+2pGKDbG5UN++w7xRqb0A5Ua2J5Ld5wUzLbU1Q0=
github.com/conduitio/conduit-connector-protocol v0.2.0 h1:gwYXVKEMgTtU67ephQ5WwTGIDbT/eTLA9Mdr9Bnbqxc=
github.com/conduitio/conduit-connector-protocol v0.2.0/go.mod h1:udCU2AkLcYQoLjAO06tHVL2iFJPw+DamK+wllnj50hk=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d h1:f3R0yPiH45hDZwNcYMSzKJP6LOGQPELCqW9OkZmd2lA=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d/go.mod h1:1nmTaD+l3mvq3PnMmPPx8UxHPM53Xk8zGT3URu2Xx2M=
github.com/conduitio/conduit-connector-s3 v0.1.1 h1:10uIakNmF65IN5TNJB1qPWC6vbdGgrHEMg8r+dxDrc8=
github.com/conduitio/conduit-connector-s3 v0.1.1/go.mod h1:xpfBzOGjZkkglTmF1444qEjXuEx+do1PTYZNroPFcSE=
github.com/conduitio/conduit-connector-sdk v0.2.0 h1:yReJT3SOAGqJIlk59WC5FPgpv0Gg+NG4NFj6FJ89XnM=
github.com/conduitio/conduit-connector-sdk v0.2.0/go.mod h1:zZ/YJqhIZyXdVmFJS55zqkukpBmB+ohbX2kDduoj8Z0=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435 h1:/bjfGf/vG8vV5WjDb7vcsluVxPZVvfsYRF4nhzJg8q4=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435/go.mod h1:RVVcsR1JBSyN8cxzjBVMyTKDym3KS6MXD2Lons/Wsw4=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down Expand Up @@ -205,6 +205,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA=
github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
github.com/go-chi/chi/v5 v5.0.7/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
Expand Down Expand Up @@ -610,8 +612,9 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -676,6 +679,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
Expand Down Expand Up @@ -726,8 +730,8 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 h1:Yqz/iviulwKwAREEeUd3nbBFn0XuyJqkoft2IlrvOhc=
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e h1:TsQ7F31D3bUCLeqPT0u+yjp1guoArKaNKmCr22PYgTQ=
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -805,8 +809,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down Expand Up @@ -948,8 +952,8 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad h1:kqrS+lhvaMHCxul6sKQvKJ8nAAhlVItmZV822hYFH/U=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220630150403-404d0664e509 h1:eUofWZEQ3SqKIW6WImdM2sxVVjnL0ahOYuIYC6WEYI8=
google.golang.org/genproto v0.0.0-20220630150403-404d0664e509/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand Down
12 changes: 8 additions & 4 deletions pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ type Source interface {
// processed and can be acknowledged.
Ack(context.Context, record.Position) error

// Stop signals to the source to stop producing records. Note that after
// this call Read can still produce records that have been cached by the
// connector.
Stop(context.Context) error
// Stop signals to the source to stop producing records. After this call
// Read will produce records until the record with the last position has
// been read (Conduit might have already received that record).
Stop(context.Context) (record.Position, error)
}

// Destination is a connector that can write records to a destination.
Expand All @@ -111,6 +111,10 @@ type Destination interface {
// processed and returns the position of that record. If the record wasn't
// successfully processed the function returns the position and an error.
Ack(context.Context) (record.Position, error)

// Stop signals to the destination that no more records will be produced
// after record with the last position.
Stop(context.Context, record.Position) error
}

// Config collects common data stored for a connector.
Expand Down
46 changes: 37 additions & 9 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/multierror"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/record"
)
Expand Down Expand Up @@ -54,12 +53,17 @@ type destination struct {
// plugin is the running instance of the destination plugin.
plugin plugin.DestinationPlugin

// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// m can lock a destination from concurrent access (e.g. in connector persister).
m sync.Mutex
// wg tracks the number of in flight calls to the plugin.
wg sync.WaitGroup
}

var _ Destination = (*destination)(nil)

func (d *destination) ID() string {
return d.XID
}
Expand Down Expand Up @@ -146,19 +150,41 @@ func (d *destination) Open(ctx context.Context) error {
return err
}

err = dest.Start(ctx)
streamCtx, cancelStreamCtx := context.WithCancel(ctx)
err = dest.Start(streamCtx)
if err != nil {
cancelStreamCtx()
_ = dest.Teardown(ctx)
return err
}

d.logger.Info(ctx).Msg("destination connector plugin successfully started")

d.plugin = dest
d.stopStream = cancelStreamCtx
d.persister.ConnectorStarted()
return nil
}

func (d *destination) Stop(ctx context.Context, lastPosition record.Position) error {
cleanup, err := d.preparePluginCall()
defer cleanup()
if err != nil {
return err
}

d.logger.Debug(ctx).
Bytes(log.RecordPositionField, lastPosition).
Msg("sending stop signal to destination connector plugin")
err = d.plugin.Stop(ctx, lastPosition)
if err != nil {
return cerrors.Errorf("could not stop destination plugin: %w", err)
}

d.logger.Debug(ctx).Msg("destination connector plugin successfully responded to stop signal")
return nil
}

func (d *destination) Teardown(ctx context.Context) error {
// lock destination as we are about to mutate the plugin field
d.m.Lock()
Expand All @@ -167,23 +193,25 @@ func (d *destination) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

d.logger.Debug(ctx).Msg("stopping destination connector plugin")
err := d.plugin.Stop(ctx)
// close stream
if d.stopStream != nil {
d.stopStream()
d.stopStream = nil
}

// wait for any calls to the plugin to stop running first (e.g. Ack or Write)
// wait for any calls to the plugin to stop running first (e.g. Stop, Ack or Write)
d.wg.Wait()

d.logger.Debug(ctx).Msg("tearing down destination connector plugin")
err = multierror.Append(err, d.plugin.Teardown(ctx))

err := d.plugin.Teardown(ctx)
d.plugin = nil
d.persister.ConnectorStopped()

if err != nil {
return cerrors.Errorf("could not tear down plugin: %w", err)
return cerrors.Errorf("could not tear down destination connector plugin: %w", err)
}

d.logger.Info(ctx).Msg("connector plugin successfully torn down")
d.logger.Info(ctx).Msg("destination connector plugin successfully torn down")
return nil
}

Expand Down
21 changes: 18 additions & 3 deletions pkg/connector/mock/connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 25 additions & 12 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ type source struct {
// plugin is the running instance of the source plugin.
plugin plugin.SourcePlugin

// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// m can lock a source from concurrent access (e.g. in connector persister).
m sync.Mutex
// wg tracks the number of in flight calls to the plugin.
wg sync.WaitGroup
}

// not running -> running -> stopping -> not running
var _ Source = (*source)(nil)

func (s *source) ID() string {
return s.XID
Expand Down Expand Up @@ -146,34 +149,39 @@ func (s *source) Open(ctx context.Context) error {
return err
}

err = src.Start(ctx, s.XState.Position)
streamCtx, cancelStreamCtx := context.WithCancel(ctx)
err = src.Start(streamCtx, s.XState.Position)
if err != nil {
cancelStreamCtx()
_ = src.Teardown(ctx)
return err
}

s.logger.Info(ctx).Msg("source connector plugin successfully started")

s.plugin = src
s.stopStream = cancelStreamCtx
s.persister.ConnectorStarted()
return nil
}

func (s *source) Stop(ctx context.Context) error {
func (s *source) Stop(ctx context.Context) (record.Position, error) {
cleanup, err := s.preparePluginCall()
defer cleanup()
if err != nil {
return err
return nil, err
}

s.logger.Debug(ctx).Msg("stopping source connector plugin")
err = s.plugin.Stop(ctx)
s.logger.Debug(ctx).Msg("sending stop signal to source connector plugin")
lastPosition, err := s.plugin.Stop(ctx)
if err != nil {
return cerrors.Errorf("could not stop plugin: %w", err)
return nil, cerrors.Errorf("could not stop source plugin: %w", err)
}

s.logger.Info(ctx).Msg("connector plugin successfully stopped")
return nil
s.logger.Info(ctx).
Bytes(log.RecordPositionField, lastPosition).
Msg("source connector plugin successfully responded to stop signal")
return lastPosition, nil
}

func (s *source) Teardown(ctx context.Context) error {
Expand All @@ -184,21 +192,26 @@ func (s *source) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

// close stream
if s.stopStream != nil {
s.stopStream()
s.stopStream = nil
}

// wait for any calls to the plugin to stop running first (e.g. Stop, Ack or Read)
s.wg.Wait()

s.logger.Debug(ctx).Msg("tearing down source connector plugin")

err := s.plugin.Teardown(ctx)

s.plugin = nil
s.persister.ConnectorStopped()

if err != nil {
return cerrors.Errorf("could not tear down plugin: %w", err)
return cerrors.Errorf("could not tear down source connector plugin: %w", err)
}

s.logger.Info(ctx).Msg("connector plugin successfully torn down")
s.logger.Info(ctx).Msg("source connector plugin successfully torn down")
return nil
}

Expand Down
Loading

0 comments on commit 44d5f00

Please sign in to comment.