From c4c572ba20f65baa8a1db7d8ccb44357d6ce3ce5 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 30 Jan 2025 17:21:38 +0100 Subject: [PATCH] Source connectors should use ReadN --- source.go | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/source.go b/source.go index 75f8d39..411dc9b 100644 --- a/source.go +++ b/source.go @@ -64,22 +64,14 @@ func (s *Source) Open(_ context.Context, _ opencdc.Position) error { return nil } -func (s *Source) Read(_ context.Context) (opencdc.Record, error) { - // Read returns a new Record and is supposed to block until there is either - // a new record or the context gets cancelled. It can also return the error - // ErrBackoffRetry to signal to the SDK it should call Read again with a - // backoff retry. - // If Read receives a cancelled context or the context is cancelled while - // Read is running it must stop retrieving new records from the source - // system and start returning records that have already been buffered. If - // there are no buffered records left Read must return the context error to - // signal a graceful stop. If Read returns ErrBackoffRetry while the context - // is cancelled it will also signal that there are no records left and Read - // won't be called again. - // After Read returns an error the function won't be called again (except if - // the error is ErrBackoffRetry, as mentioned above). - // Read can be called concurrently with Ack. - return opencdc.Record{}, nil +func (s *Source) ReadN(context.Context, int) ([]opencdc.Record, error) { + // ReadN is the same as Read, but returns a batch of records. The connector + // is expected to return at most n records. If there are fewer records + // available, it should return all of them. If there are no records available + // it should block until there are records available or the context is + // cancelled. If the context is cancelled while ReadN is running, it should + // return the context error. + return []opencdc.Record{}, nil } func (s *Source) Ack(_ context.Context, _ opencdc.Position) error {