diff --git a/Godeps b/Godeps index 54e64b8d9..9bfc26dfe 100644 --- a/Godeps +++ b/Godeps @@ -1,6 +1,6 @@ github.com/BurntSushi/toml 2dff11163ee667d51dcc066660925a92ce138deb github.com/bitly/go-hostpool 58b95b10d6ca26723a7f46017b348653b825a8d6 -github.com/bitly/go-nsq 4271cd1529a78175e327570894988cc2cb21228f # v1.0.5-alpha +github.com/bitly/go-nsq 0f97a46d801c18d6fd1a12a9040d11cc4e4ef397 # v1.0.5-alpha github.com/bitly/go-simplejson 18db6e68d8fd9cbf2e8ebe4c81a78b96fd9bf05a github.com/bmizerany/perks/quantile 6cb9d9d729303ee2628580d9aec5db968da3a607 github.com/mreiferson/go-options 2cf7eb1fdd83e2bb3375fef6fdadb04c3ad564da diff --git a/apps/nsq_tail/nsq_tail.go b/apps/nsq_tail/nsq_tail.go index 1b71e8331..101d8d96c 100644 --- a/apps/nsq_tail/nsq_tail.go +++ b/apps/nsq_tail/nsq_tail.go @@ -23,16 +23,11 @@ var ( maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight") totalMessages = flag.Int("n", 0, "total messages to show (will wait if starved)") - consumerOpts = app.StringArray{} nsqdTCPAddrs = app.StringArray{} lookupdHTTPAddrs = app.StringArray{} ) func init() { - // TODO: remove, deprecated - flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt") - flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") - flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)") flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)") } @@ -59,6 +54,11 @@ func (th *TailHandler) HandleMessage(m *nsq.Message) error { } func main() { + cfg := nsq.NewConfig() + // TODO: remove, deprecated + flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt") + flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") + flag.Parse() if *showVersion { @@ -90,12 +90,7 @@ func main() { *maxInFlight = *totalMessages } - cfg := nsq.NewConfig() cfg.UserAgent = fmt.Sprintf("nsq_tail/%s go-nsq/%s", version.Binary, nsq.VERSION) - err := app.ParseOpts(cfg, consumerOpts) - if err != nil { - log.Fatal(err) - } cfg.MaxInFlight = *maxInFlight consumer, err := nsq.NewConsumer(*topic, *channel, cfg) diff --git a/apps/nsq_to_file/nsq_to_file.go b/apps/nsq_to_file/nsq_to_file.go index ff004779c..60b9ca521 100644 --- a/apps/nsq_to_file/nsq_to_file.go +++ b/apps/nsq_to_file/nsq_to_file.go @@ -44,7 +44,6 @@ var ( rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes") rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration") - consumerOpts = app.StringArray{} nsqdTCPAddrs = app.StringArray{} lookupdHTTPAddrs = app.StringArray{} topics = app.StringArray{} @@ -54,10 +53,6 @@ var ( ) func init() { - // TODO: remove, deprecated - flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt") - flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") - flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)") flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)") flag.Var(&topics, "topic", "nsq topic (may be given multiple times)") @@ -93,13 +88,15 @@ type TopicDiscoverer struct { termChan chan os.Signal hupChan chan os.Signal wg sync.WaitGroup + cfg *nsq.Config } -func newTopicDiscoverer() *TopicDiscoverer { +func newTopicDiscoverer(cfg *nsq.Config) *TopicDiscoverer { return &TopicDiscoverer{ topics: make(map[string]*ConsumerFileLogger), termChan: make(chan os.Signal), hupChan: make(chan os.Signal), + cfg: cfg, } } @@ -360,20 +357,12 @@ func hasArg(s string) bool { return false } -func newConsumerFileLogger(topic string) (*ConsumerFileLogger, error) { +func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) { f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic) if err != nil { return nil, err } - cfg := nsq.NewConfig() - cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION) - err = app.ParseOpts(cfg, consumerOpts) - if err != nil { - return nil, err - } - cfg.MaxInFlight = *maxInFlight - consumer, err := nsq.NewConsumer(topic, *channel, cfg) if err != nil { return nil, err @@ -424,7 +413,7 @@ func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) { log.Println("Skipping topic ", topic, "as it didn't match required pattern:", pattern) continue } - logger, err := newConsumerFileLogger(topic) + logger, err := newConsumerFileLogger(topic, t.cfg) if err != nil { log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err) continue @@ -466,6 +455,12 @@ func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) { } func main() { + cfg := nsq.NewConfig() + + // TODO: remove, deprecated + flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt") + flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") + flag.Parse() if *showVersion { @@ -505,7 +500,10 @@ func main() { } } - discoverer := newTopicDiscoverer() + cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION) + cfg.MaxInFlight = *maxInFlight + + discoverer := newTopicDiscoverer(cfg) signal.Notify(discoverer.hupChan, syscall.SIGHUP) signal.Notify(discoverer.termChan, syscall.SIGINT, syscall.SIGTERM) @@ -528,7 +526,7 @@ func main() { continue } - logger, err := newConsumerFileLogger(topic) + logger, err := newConsumerFileLogger(topic, cfg) if err != nil { log.Fatalf("ERROR: couldn't create logger for topic %s: %s", topic, err) } diff --git a/apps/nsq_to_http/nsq_to_http.go b/apps/nsq_to_http/nsq_to_http.go index 97620443c..38427e2ad 100644 --- a/apps/nsq_to_http/nsq_to_http.go +++ b/apps/nsq_to_http/nsq_to_http.go @@ -46,7 +46,6 @@ var ( statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per handler), 0 disables") contentType = flag.String("content-type", "application/octet-stream", "the Content-Type used for POST requests") - consumerOpts = app.StringArray{} getAddrs = app.StringArray{} postAddrs = app.StringArray{} nsqdTCPAddrs = app.StringArray{} @@ -60,9 +59,6 @@ var ( ) func init() { - // TODO: remove, deprecated - flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt") - flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") flag.Var(&postAddrs, "post", "HTTP address to make a POST request to. data will be in the body (may be given multiple times)") flag.Var(&getAddrs, "get", "HTTP address to make a GET request to. '%s' will be printf replaced with data (may be given multiple times)") @@ -171,6 +167,11 @@ func hasArg(s string) bool { } func main() { + cfg := nsq.NewConfig() + // TODO: remove, deprecated + flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt") + flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") + var publisher Publisher var addresses app.StringArray var selectedMode int @@ -259,12 +260,7 @@ func main() { addresses = getAddrs } - cfg := nsq.NewConfig() cfg.UserAgent = fmt.Sprintf("nsq_to_http/%s go-nsq/%s", version.Binary, nsq.VERSION) - err := app.ParseOpts(cfg, consumerOpts) - if err != nil { - log.Fatal(err) - } cfg.MaxInFlight = *maxInFlight // TODO: remove, deprecated diff --git a/apps/nsq_to_nsq/nsq_to_nsq.go b/apps/nsq_to_nsq/nsq_to_nsq.go index de5583ae3..05d113b86 100644 --- a/apps/nsq_to_nsq/nsq_to_nsq.go +++ b/apps/nsq_to_nsq/nsq_to_nsq.go @@ -42,8 +42,6 @@ var ( statusEvery = flag.Int("status-every", 250, "the # of requests between logging status (per destination), 0 disables") mode = flag.String("mode", "hostpool", "the upstream request mode options: round-robin, hostpool (default), epsilon-greedy") - consumerOpts = app.StringArray{} - producerOpts = app.StringArray{} nsqdTCPAddrs = app.StringArray{} lookupdHTTPAddrs = app.StringArray{} destNsqdTCPAddrs = app.StringArray{} @@ -57,11 +55,6 @@ var ( ) func init() { - // TODO: remove, deprecated - flag.Var(&consumerOpts, "reader-opt", "(deprecated) use --consumer-opt") - flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)") - flag.Var(&producerOpts, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)") - flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)") flag.Var(&destNsqdTCPAddrs, "destination-nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)") flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)") @@ -274,6 +267,14 @@ func hasArg(s string) bool { func main() { var selectedMode int + cCfg := nsq.NewConfig() + pCfg := nsq.NewConfig() + + // TODO: remove, deprecated + flag.Var(&nsq.ConfigFlag{cCfg}, "reader-opt", "(deprecated) use --consumer-opt") + flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)") + flag.Var(&nsq.ConfigFlag{pCfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, see http://godoc.org/github.com/bitly/go-nsq#Config)") + flag.Parse() if *showVersion { @@ -324,12 +325,7 @@ func main() { defaultUA := fmt.Sprintf("nsq_to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION) - cCfg := nsq.NewConfig() cCfg.UserAgent = defaultUA - err := app.ParseOpts(cCfg, consumerOpts) - if err != nil { - log.Fatal(err) - } cCfg.MaxInFlight = *maxInFlight // TODO: remove, deprecated @@ -338,19 +334,13 @@ func main() { cCfg.MaxBackoffDuration = *maxBackoffDuration } - pCfg := nsq.NewConfig() - pCfg.UserAgent = defaultUA - - err = app.ParseOpts(pCfg, producerOpts) - if err != nil { - log.Fatal(err) - } - consumer, err := nsq.NewConsumer(*topic, *channel, cCfg) if err != nil { log.Fatal(err) } + pCfg.UserAgent = defaultUA + producers := make(map[string]*nsq.Producer) for _, addr := range destNsqdTCPAddrs { producer, err := nsq.NewProducer(addr, pCfg) diff --git a/apps/to_nsq/to_nsq.go b/apps/to_nsq/to_nsq.go index 813fd31ab..5e7d9c856 100644 --- a/apps/to_nsq/to_nsq.go +++ b/apps/to_nsq/to_nsq.go @@ -23,15 +23,16 @@ var ( delimiter = flag.String("delimiter", "\n", "character to split input from stdin (defaults to '\n')") destNsqdTCPAddrs = app.StringArray{} - producerOpts = app.StringArray{} ) func init() { - flag.Var(&producerOpts, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") flag.Var(&destNsqdTCPAddrs, "nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)") } func main() { + cfg := nsq.NewConfig() + flag.Var(&nsq.ConfigFlag{cfg}, "producer-opt", "option to passthrough to nsq.Producer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)") + flag.Parse() if len(*topic) == 0 { @@ -46,14 +47,8 @@ func main() { termChan := make(chan os.Signal, 1) signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) - cfg := nsq.NewConfig() cfg.UserAgent = fmt.Sprintf("to_nsq/%s go-nsq/%s", version.Binary, nsq.VERSION) - err := app.ParseOpts(cfg, producerOpts) - if err != nil { - log.Fatal(err) - } - // make the producers producers := make(map[string]*nsq.Producer) for _, addr := range destNsqdTCPAddrs { diff --git a/internal/app/parse_opts.go b/internal/app/parse_opts.go deleted file mode 100644 index 10796fb63..000000000 --- a/internal/app/parse_opts.go +++ /dev/null @@ -1,29 +0,0 @@ -package app - -import ( - "errors" - "strings" - - "github.com/bitly/go-nsq" -) - -func ParseOpts(cfg *nsq.Config, opts StringArray) error { - var err error - for _, opt := range opts { - parts := strings.Split(opt, ",") - key := parts[0] - switch len(parts) { - case 1: - // default options specified without a value to boolean true - err = cfg.Set(key, true) - case 2: - err = cfg.Set(key, parts[1]) - default: - err = errors.New("cannot have more than 2 parameters") - } - if err != nil { - return err - } - } - return nil -}