Skip to content
This repository has been archived by the owner on May 30, 2024. It is now read-only.

Commit

Permalink
Initial
Browse files Browse the repository at this point in the history
  • Loading branch information
Mick Staugaard committed Jan 6, 2020
0 parents commit 6359d50
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 0 deletions.
86 changes: 86 additions & 0 deletions cmd/createTopic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package cmd

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/spf13/cobra"
"strings"
)

// createTopicCmd represents the createTopic command
var createTopicCmd = &cobra.Command{
Use: "createTopic",
Short: "Create one or more topics",
Long: `createTopic can create one or more topics. Example:
kafkaCLI createTopic --bootstrap-server kafka:9092 --partitions 4 --replication-factor 1 --config message.format.version=2.0 --if-not-exists topic1 topic2
`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client, err := kafkaClient()
if err != nil {
panic(err)
}
kafkaAdmin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
panic(err)
}

for _, topicName := range args {
fmt.Println("Creating topic " + topicName)
err = kafkaAdmin.CreateTopic(topicName, topicDetail(), false)
if err != nil {
switch err.(type) {
case *sarama.TopicError:
if err, ok := err.(*sarama.TopicError); ok {
if err.Err != sarama.ErrTopicAlreadyExists {
panic(err)
}

if ifNotExists {
fmt.Println(err)
} else {
panic(err)
}
}
break
default:
panic(err)
}
}
}

_ = kafkaAdmin.Close()
},
}

var partitions int32
var replicationFactor int16
var ifNotExists bool
var configEntries []string

func init() {
rootCmd.AddCommand(createTopicCmd)

createTopicCmd.Flags().Int32VarP(&partitions, "partitions", "p", 1, "number of partitions for the topic")
createTopicCmd.Flags().Int16VarP(&replicationFactor, "replication-factor", "r", 1, "replication-factor for the topic")
createTopicCmd.Flags().BoolVarP(&ifNotExists, "if-not-exists", "", false, "only create the topic if it does not exist")
createTopicCmd.Flags().StringSliceVarP(&configEntries, "config", "c", []string{}, "config")
}

func topicDetail() *sarama.TopicDetail {
config := make(map[string]*string, 0)

for _, entry := range configEntries {
parts := strings.Split(entry, "=")
config[parts[0]] = &parts[1]
}

detail := &sarama.TopicDetail{
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
ConfigEntries: config,
}

return detail
}
48 changes: 48 additions & 0 deletions cmd/deleteTopic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cmd

import (
"fmt"
"github.com/Shopify/sarama"

"github.com/spf13/cobra"
)

// deleteTopicCmd represents the deleteTopic command
var deleteTopicCmd = &cobra.Command{
Use: "deleteTopic",
Short: "Delete one or more topics",
Long: `deleteTopic deletes one or more topics. Example:
kafkaCLI deleteTopic --bootstrap-server kafka:9092 topic1 topic2
`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client, err := kafkaClient()
if err != nil {
panic(err)
}
kafkaAdmin, err := sarama.NewClusterAdminFromClient(client)
if err != nil {
panic(err)
}

for _, topicName := range args {
fmt.Println("Deleting topic " + topicName)
err = kafkaAdmin.DeleteTopic(topicName)

if err != nil {
switch err {
case sarama.ErrUnknownTopicOrPartition:
fmt.Println("Topic " + topicName + " did not exist")
break
default:
panic(err)
}
}
}
},
}

func init() {
rootCmd.AddCommand(deleteTopicCmd)
}
47 changes: 47 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cmd

import (
"fmt"
"github.com/spf13/cobra"
"os"

"github.com/Shopify/sarama"
)

var bootstrapServer string

// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "kafkaCLI",
Short: "A brief description of your application",
Long: `A longer description that spans multiple lines and likely contains
examples and usage of using your application. For example:
Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
// Uncomment the following line if your bare application
// has an action associated with it:
// Run: func(cmd *cobra.Command, args []string) { },
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

func init() {
rootCmd.PersistentFlags().StringVarP(&bootstrapServer, "bootstrap-server", "s", "", "address of a node in the kafka cluster")
_ = rootCmd.MarkPersistentFlagRequired("bootstrap-server")
}

func kafkaClient() (sarama.Client, error) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V1_0_0_0
addresses := []string{bootstrapServer}
return sarama.NewClient(addresses, kafkaConfig)
}
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/vend/kafkaCLI

go 1.13

require (
github.com/Shopify/sarama v1.24.1
github.com/spf13/cobra v0.0.5
)
Loading

0 comments on commit 6359d50

Please sign in to comment.