From d45bcd609a3338a478dc9b923ee03a574a64e763 Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Sun, 30 Oct 2016 03:14:31 -0500 Subject: [PATCH] add client metadata api --- client/client.go | 42 +++++++++++++++++++------- client/client_test.go | 69 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 client/client_test.go diff --git a/client/client.go b/client/client.go index 11d06ce2..0de9a4bb 100644 --- a/client/client.go +++ b/client/client.go @@ -1,20 +1,42 @@ package client -type Client struct { -} - -func New() *Client { - return &Client{} -} +import ( + "bytes" + "encoding/json" + "net/http" -func (c *Client) Metadata() { + "github.com/travisjeffery/jocko/server" +) +type Options struct { + Addr string } -func (c *Client) Produce() { - +type Client struct { + Options + c *http.Client } -func (c *Client) Fetch() { +func New(options Options) *Client { + return &Client{ + Options: options, + c: &http.Client{}, + } +} +func (c *Client) Metadata(topics ...string) (metadata server.MetadataResponse, err error) { + req := server.MetadataRequest{Topics: topics} + b := new(bytes.Buffer) + if err = json.NewEncoder(b).Encode(req); err != nil { + return + } + res, err := c.c.Post(c.Addr+"/metadata", "application/json", b) + if err != nil { + return metadata, err + } + defer res.Body.Close() + if err := json.NewDecoder(res.Body).Decode(&metadata); err != nil { + return metadata, err + } + return metadata, nil } diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 00000000..beb675e3 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,69 @@ +package client + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "github.com/travisjeffery/jocko/broker" + "github.com/travisjeffery/jocko/server" +) + +type ClientTestSuite struct { + suite.Suite + + broker *broker.Broker + server *server.Server +} + +func (suite *ClientTestSuite) SetupSuite() { + dir := os.TempDir() + os.RemoveAll(dir) + logs := filepath.Join(dir, "logs") + assert.NoError(suite.T(), os.MkdirAll(logs, 0755)) + + data := filepath.Join(dir, "data") + assert.NoError(suite.T(), os.MkdirAll(data, 0755)) + + suite.broker = broker.New(broker.Options{ + DataDir: data, + BindAddr: ":0", + LogDir: logs, + }) + assert.NoError(suite.T(), suite.broker.Open()) + + _, err := suite.broker.WaitForLeader(10 * time.Second) + assert.NoError(suite.T(), err) + + err = suite.broker.CreateTopic("my_topic", 2) + assert.NoError(suite.T(), err) + + suite.server = server.New("localhost:3000", suite.broker) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), suite.server) + assert.NoError(suite.T(), suite.server.Start()) +} + +func (suite *ClientTestSuite) TeardownSuite() { + suite.broker.Close() + suite.server.Close() +} + +func (suite *ClientTestSuite) TestRefreshMetadata() { + c := New(Options{ + Addr: "http://localhost:3000", + }) + metadata, err := c.Metadata() + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), ":0", metadata.ControllerID) + assert.Equal(suite.T(), ":0", metadata.Brokers[0].ID) + assert.Equal(suite.T(), "my_topic", metadata.TopicMetadata[0].Topic) + assert.Equal(suite.T(), ":0", metadata.TopicMetadata[0].PartitionMetadata[0].Leader) +} + +func TestClient(t *testing.T) { + suite.Run(t, new(ClientTestSuite)) +}