diff --git a/kq/pusher.go b/kq/pusher.go index 94d021b..2c56b2b 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -11,7 +11,7 @@ import ( ) type ( - PushOption func(options *chunkOptions) + PushOption func(options *pushOptions) Pusher struct { producer *kafka.Writer @@ -19,7 +19,11 @@ type ( executor *executors.ChunkExecutor } - chunkOptions struct { + pushOptions struct { + // kafka.Writer options + allowAutoTopicCreation bool + + // executors.ChunkExecutor options chunkSize int flushInterval time.Duration } @@ -33,6 +37,24 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { Balancer: &kafka.LeastBytes{}, Compression: kafka.Snappy, } + + var options pushOptions + for _, opt := range opts { + opt(&options) + } + + // apply kafka.Writer options + producer.AllowAutoTopicCreation = options.allowAutoTopicCreation + + // apply ChunkExecutor options + var chunkOpts []executors.ChunkOption + if options.chunkSize > 0 { + chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize)) + } + if options.flushInterval > 0 { + chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval)) + } + pusher := &Pusher{ producer: producer, topic: topic, @@ -45,7 +67,7 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { if err := pusher.producer.WriteMessages(context.Background(), chunk...); err != nil { logx.Error(err) } - }, newOptions(opts)...) + }, chunkOpts...) return pusher } @@ -79,30 +101,21 @@ func (p *Pusher) Push(v string) error { // WithChunkSize customizes the Pusher with the given chunk size. func WithChunkSize(chunkSize int) PushOption { - return func(options *chunkOptions) { + return func(options *pushOptions) { options.chunkSize = chunkSize } } // WithFlushInterval customizes the Pusher with the given flush interval. func WithFlushInterval(interval time.Duration) PushOption { - return func(options *chunkOptions) { + return func(options *pushOptions) { options.flushInterval = interval } } -func newOptions(opts []PushOption) []executors.ChunkOption { - var options chunkOptions - for _, opt := range opts { - opt(&options) - } - - var chunkOpts []executors.ChunkOption - if options.chunkSize > 0 { - chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize)) - } - if options.flushInterval > 0 { - chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval)) +// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist. +func WithAllowAutoTopicCreation() PushOption { + return func(options *pushOptions) { + options.allowAutoTopicCreation = true } - return chunkOpts }