Skip to content

Commit

Permalink
Take the list of kafka brokers from the config
Browse files Browse the repository at this point in the history
  • Loading branch information
aduffeck committed Mar 6, 2024
1 parent 10fa589 commit 4ebcec4
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
5 changes: 3 additions & 2 deletions pkg/storage/fs/posix/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
type Options struct {
decomposedoptions.Options

WatchType string `mapstructure:"watch_type"`
WatchPath string `mapstructure:"watch_path"`
WatchType string `mapstructure:"watch_type"`
WatchPath string `mapstructure:"watch_path"`
WatchFolderKafkaBrokers string `mapstructure:"watch_folder_kafka_brokers"`
}

// New returns a new Options instance for the given configuration
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type GpfsWatchFolderWatcher struct {
c *kafka.Consumer
}

func NewGpfsWatchFolderWatcher(tree *Tree, kafkaservers []string) (*GpfsWatchFolderWatcher, error) {
func NewGpfsWatchFolderWatcher(tree *Tree, kafkaBrokers []string) (*GpfsWatchFolderWatcher, error) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": strings.Join(kafkaservers, ","),
"bootstrap.servers": strings.Join(kafkaBrokers, ","),
"group.id": "ocis-posixfs",
"auto.offset.reset": "earliest",
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/fs/posix/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func New(lu node.PathLookup, bs Blobstore, o *options.Options, cache store.Store
var err error
switch o.WatchType {
case "gpfswatchfolder":
t.watcher, err = NewGpfsWatchFolderWatcher(t, []string{"192.168.3.99:29092"})
t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchFolderKafkaBrokers, ","))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 4ebcec4

Please sign in to comment.