Skip to content

Commit

Permalink
fix the historical on consuming
Browse files Browse the repository at this point in the history
  • Loading branch information
darkua committed Nov 25, 2016
1 parent 2671d2c commit 8a823f3
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 24 deletions.
21 changes: 1 addition & 20 deletions kafka_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,7 @@ func listenToMetadata(client *sarama.Client) {
if meta.Cancel != nil {
handleCancelRequest(meta.Cancel)
}
if meta.Restart != nil && !meta.Historical {
handleRestartRequest(meta.Restart)
}
}
msg.Processed <- "success"
}
log.Panic("listenToMetadada: Lost connection to Kafka")
}

//hack because we cant have history on the restart queue! another queue? another client?
func listenToRestart(client *sarama.Client) {
topic := bn.ModuleToTopic(ModuleName, true)
ch := make(chan *kafka.ConsumerData)
go kafka.Consume(client, topic, ch)
for msg := range ch {
var meta pb.Meta
if err := proto.Unmarshal(msg.Value, &meta); err != nil {
log.WithError(err).Warn("Unknown meta data received")
} else {
if meta.Restart != nil {
if meta.Restart != nil && !msg.Historical {
handleRestartRequest(meta.Restart)
}
}
Expand Down
4 changes: 0 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ func run(address string, kafkaBrokers []string) {
client := kafka.NewClient(kafkaBrokers, clientID, debug)
defer (*client).Close()

clientRestart := kafka.NewClient(kafkaBrokers, clientID+".restart", debug)
defer (*clientRestart).Close()

clientgroup := kafka.NewClientGroup(kafkaBrokers, clientID, debug)
defer (*clientgroup).Close()

Expand All @@ -73,7 +70,6 @@ func run(address string, kafkaBrokers []string) {
go registerModule(producer, ModuleName)
go startWorker(clientgroup, producer, []string{module.KafkaTopicName("IO"), module.KafkaTopicName("Ping")})
go listenToMetadata(client)
go listenToRestart(clientRestart)
}

log.WithFields(log.Fields{"version": Version, "interfaces": interfaces, "debug": debug}).Warn("Start module")
Expand Down

0 comments on commit 8a823f3

Please sign in to comment.