diff --git a/example/main.go b/example/main.go index ef93110d..bea081b8 100644 --- a/example/main.go +++ b/example/main.go @@ -37,38 +37,12 @@ var ( func main() { kingpin.Parse() - logger := simplelog.New(os.Stdout, simplelog.DEBUG, "jocko") - - store := broker.New(broker.Options{ - DataDir: *raftDir, - RaftAddr: *raftAddr, - TCPAddr: *tcpaddr, - LogDir: *logDir, - ID: *brokerID, - Logger: logger, - }) - if err := store.Open(); err != nil { - fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err) - os.Exit(1) - } - server := server.New(*tcpaddr, store, logger) - if err := server.Start(); err != nil { - fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err) - os.Exit(1) - } - - if _, err := store.WaitForLeader(10 * time.Second); err != nil { - panic(err) - } - - // creating/deleting topic directly since Sarama doesn't support it - if err := store.CreateTopic(topic, numPartitions); err != nil && err != broker.ErrTopicExists { - panic(err) - } + setup() config := sarama.NewConfig() config.ChannelBufferSize = 1 config.Version = sarama.V0_10_0_1 + config.Producer.Return.Successes = true brokers := []string{*tcpaddr} producer, err := sarama.NewSyncProducer(brokers, config) @@ -136,3 +110,34 @@ func main() { } fmt.Printf("producer and consumer worked! %d messages ok\n", totalChecked) } + +func setup() { + logger := simplelog.New(os.Stdout, simplelog.INFO, "jocko") + + store := broker.New(broker.Options{ + DataDir: *raftDir, + RaftAddr: *raftAddr, + TCPAddr: *tcpaddr, + LogDir: *logDir, + ID: *brokerID, + Logger: logger, + }) + if err := store.Open(); err != nil { + fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err) + os.Exit(1) + } + server := server.New(*tcpaddr, store, logger) + if err := server.Start(); err != nil { + fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err) + os.Exit(1) + } + + if _, err := store.WaitForLeader(10 * time.Second); err != nil { + panic(err) + } + + // creating/deleting topic directly since Sarama doesn't support it + if err := store.CreateTopic(topic, numPartitions); err != nil && err != broker.ErrTopicExists { + panic(err) + } +}