Skip to content

Commit

Permalink
Source connectors should use ReadN
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Jan 30, 2025
1 parent c88dce2 commit c4c572b
Showing 1 changed file with 8 additions and 16 deletions.
24 changes: 8 additions & 16 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,14 @@ func (s *Source) Open(_ context.Context, _ opencdc.Position) error {
return nil
}

func (s *Source) Read(_ context.Context) (opencdc.Record, error) {
// Read returns a new Record and is supposed to block until there is either
// a new record or the context gets cancelled. It can also return the error
// ErrBackoffRetry to signal to the SDK it should call Read again with a
// backoff retry.
// If Read receives a cancelled context or the context is cancelled while
// Read is running it must stop retrieving new records from the source
// system and start returning records that have already been buffered. If
// there are no buffered records left Read must return the context error to
// signal a graceful stop. If Read returns ErrBackoffRetry while the context
// is cancelled it will also signal that there are no records left and Read
// won't be called again.
// After Read returns an error the function won't be called again (except if
// the error is ErrBackoffRetry, as mentioned above).
// Read can be called concurrently with Ack.
return opencdc.Record{}, nil
func (s *Source) ReadN(context.Context, int) ([]opencdc.Record, error) {
// ReadN is the same as Read, but returns a batch of records. The connector
// is expected to return at most n records. If there are fewer records
// available, it should return all of them. If there are no records available
// it should block until there are records available or the context is
// cancelled. If the context is cancelled while ReadN is running, it should
// return the context error.
return []opencdc.Record{}, nil
}

func (s *Source) Ack(_ context.Context, _ opencdc.Position) error {
Expand Down

0 comments on commit c4c572b

Please sign in to comment.