diff --git a/cmd/createtopic/main.go b/cmd/createtopic/main.go deleted file mode 100644 index 32c23e4e..00000000 --- a/cmd/createtopic/main.go +++ /dev/null @@ -1,53 +0,0 @@ -package main - -import ( - "fmt" - "net" - - "github.com/travisjeffery/jocko/protocol" - "github.com/travisjeffery/jocko/server" - kingpin "gopkg.in/alecthomas/kingpin.v2" -) - -var ( - brokerAddr = kingpin.Flag("brokeraddr", "Address for Broker to bind on").Default("0.0.0.0:9092").String() - topic = kingpin.Flag("topic", "Name of topic to create").String() - partitions = kingpin.Flag("partitions", "Number of partitions").Default("1").Int32() - replicationFactor = kingpin.Flag("replicationfactor", "Replication factor").Default("1").Int16() -) - -func main() { - kingpin.Parse() - - addr, err := net.ResolveTCPAddr("tcp", *brokerAddr) - if err != nil { - panic(err) - } - - conn, err := net.DialTCP("tcp", nil, addr) - if err != nil { - panic(err) - } - - client := server.NewClient(conn) - - resp, err := client.CreateTopic("cmd/createtopic", &protocol.CreateTopicRequest{ - Topic: *topic, - NumPartitions: *partitions, - ReplicationFactor: *replicationFactor, - ReplicaAssignment: nil, - Configs: nil, - }) - - if err != nil { - panic(err) - } - - for _, topicErrCode := range resp.TopicErrorCodes { - msg := "ok" - if topicErrCode.ErrorCode == 41 { - msg = "err not controller" - } - fmt.Printf("create topic %s: %s\n", topicErrCode.Topic, msg) - } -} diff --git a/cmd/jocko/main.go b/cmd/jocko/main.go index 02cde6b0..1444ff9f 100644 --- a/cmd/jocko/main.go +++ b/cmd/jocko/main.go @@ -3,10 +3,12 @@ package main import ( "fmt" "os" + "net" "time" "github.com/tj/go-gracefully" "github.com/travisjeffery/jocko/broker" + "github.com/travisjeffery/jocko/protocol" "github.com/travisjeffery/jocko/raft" "github.com/travisjeffery/jocko/serf" "github.com/travisjeffery/jocko/server" @@ -15,40 +17,57 @@ import ( ) var ( - logDir = kingpin.Flag("logdir", "A comma separated list of directories under which to store log files").Default("/tmp/jocko").String() - raftAddr = kingpin.Flag("raftaddr", "Address for Raft to bind and advertise on").Default("127.0.0.1:9093").String() - brokerAddr = kingpin.Flag("brokeraddr", "Address for Broker to bind on").Default("0.0.0.0:9092").String() - serfAddr = kingpin.Flag("serfaddr", "Address for Serf to bind on").Default("0.0.0.0:9094").String() - serfMembers = kingpin.Flag("serfmembers", "List of existing Serf members").Strings() - brokerID = kingpin.Flag("id", "Broker ID").Int32() - debugLogs = kingpin.Flag("debug", "Enable debug logs").Default("false").Bool() + cli = kingpin.New("jocko", "Jocko, a Go implementation of Kafka") + logDir = cli.Flag("logdir", "A comma separated list of directories under which to store log files").Default("/tmp/jocko").String() + debugLogs = cli.Flag("debug", "Enable debug logs").Default("false").Bool() + + brokerCmd = cli.Command("broker", "Operations on brokers") + brokerCmdRaftAddr = brokerCmd.Flag("raftaddr", "Address for Raft to bind and advertise on").Default("127.0.0.1:9093").String() + brokerCmdBrokerAddr = brokerCmd.Flag("brokeraddr", "Address for broker to bind on").Default("0.0.0.0:9092").String() + brokerCmdSerfAddr = brokerCmd.Flag("serfaddr", "Address for Serf to bind on").Default("0.0.0.0:9094").String() + brokerCmdSerfMembers = brokerCmd.Flag("serfmembers", "List of existing Serf members").Strings() + brokerCmdBrokerID = brokerCmd.Flag("id", "Broker ID").Int32() + + topic = cli.Command("topic", "Operations on topics") + topicBrokerAddr = topic.Flag("brokeraddr", "Address for Broker to bind on").Default("0.0.0.0:9092").String() + topicTopic = topic.Flag("topic", "Name of topic to create").String() + topicPartitions = topic.Flag("partitions", "Number of partitions").Default("1").Int32() + topicReplicationFactor = topic.Flag("replicationfactor", "Replication factor").Default("1").Int16() ) func main() { - kingpin.Parse() - logLevel := simplelog.INFO if *debugLogs { logLevel = simplelog.DEBUG } logger := simplelog.New(os.Stdout, logLevel, "jocko") + switch kingpin.MustParse(cli.Parse(os.Args[1:])) { + case brokerCmd.FullCommand(): + os.Exit(CmdBrokers(logger)) + + case topic.FullCommand(): + os.Exit(CmdTopic(logger)) + } +} + +func CmdBrokers(logger *simplelog.Logger) int { serf, err := serf.New( serf.Logger(logger), - serf.Addr(*serfAddr), - serf.InitMembers(*serfMembers), + serf.Addr(*brokerCmdSerfAddr), + serf.InitMembers(*brokerCmdSerfMembers), ) raft, err := raft.New( raft.Logger(logger), raft.DataDir(*logDir), - raft.Addr(*raftAddr), + raft.Addr(*brokerCmdRaftAddr), ) - store, err := broker.New(*brokerID, + store, err := broker.New(*brokerCmdBrokerID, broker.LogDir(*logDir), broker.Logger(logger), - broker.Addr(*brokerAddr), + broker.Addr(*brokerCmdBrokerAddr), broker.Serf(serf), broker.Raft(raft), ) @@ -56,7 +75,7 @@ func main() { fmt.Fprintf(os.Stderr, "Error with new broker: %s\n", err) os.Exit(1) } - srv := server.New(*brokerAddr, store, logger) + srv := server.New(*brokerCmdBrokerAddr, store, logger) if err := srv.Start(); err != nil { fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err) os.Exit(1) @@ -70,4 +89,43 @@ func main() { if err := store.Shutdown(); err != nil { panic(err) } + + return 0 +} + +func CmdTopic(logger *simplelog.Logger) int { + logger.Info("Create topic") + addr, err := net.ResolveTCPAddr("tcp", *topicBrokerAddr) + if err != nil { + panic(err) + } + + conn, err := net.DialTCP("tcp", nil, addr) + if err != nil { + panic(err) + } + + client := server.NewClient(conn) + + resp, err := client.CreateTopic("cmd/createtopic", &protocol.CreateTopicRequest{ + Topic: *topicTopic, + NumPartitions: *topicPartitions, + ReplicationFactor: *topicReplicationFactor, + ReplicaAssignment: nil, + Configs: nil, + }) + + if err != nil { + panic(err) + } + + for _, topicErrCode := range resp.TopicErrorCodes { + msg := "ok" + if topicErrCode.ErrorCode == 41 { + msg = "err not controller" + } + fmt.Printf("create topic %s: %s\n", topicErrCode.Topic, msg) + } + + return 0 } diff --git a/examples/cluster/README.md b/examples/cluster/README.md index e5cbc0e3..166d59ec 100644 --- a/examples/cluster/README.md +++ b/examples/cluster/README.md @@ -13,14 +13,16 @@ $ go build ## Start the nodes ```bash -$ ./jocko --debug \ +$ ./jocko broker \ + --debug \ --logdir="/tmp/jocko1" \ --brokeraddr=127.0.0.1:9001 \ --raftaddr=127.0.0.1:9002 \ --serfaddr=127.0.0.1:9003 \ --id=1 -$ ./jocko --debug \ +$ ./jocko broker \ + --debug \ --logdir="/tmp/jocko2" \ --brokeraddr=127.0.0.1:9101 \ --raftaddr=127.0.0.1:9102 \ @@ -28,7 +30,8 @@ $ ./jocko --debug \ --serfmembers=127.0.0.1:9003 \ --id=2 -$ ./jocko --debug \ +$ ./jocko broker \ + --debug \ --logdir="/tmp/jocko3" \ --brokeraddr=127.0.0.1:9201 \ --raftaddr=127.0.0.1:9202 \