Skip to content

Commit

Permalink
Merge pull request #61 from travisjeffery/server-broker-refactor
Browse files Browse the repository at this point in the history
The Big Server and Broker Refactor
  • Loading branch information
Travis Jeffery authored Oct 16, 2017
2 parents 94c4f20 + e0363fe commit 83c6e61
Show file tree
Hide file tree
Showing 9 changed files with 586 additions and 561 deletions.
466 changes: 443 additions & 23 deletions broker/broker.go

Large diffs are not rendered by default.

59 changes: 0 additions & 59 deletions broker/replication_manager.go

This file was deleted.

3 changes: 2 additions & 1 deletion cmd/jocko/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -91,7 +92,7 @@ func CmdBrokers(logger *simplelog.Logger) int {

prom := prometheus.DefaultRegisterer
srv := server.New(*brokerCmdBrokerAddr, store, logger, prom)
if err := srv.Start(); err != nil {
if err := srv.Start(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err)
os.Exit(1)
}
Expand Down
10 changes: 6 additions & 4 deletions examples/sarama/main.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"net/http"

"github.com/Shopify/sarama"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/travisjeffery/jocko/broker"
"github.com/travisjeffery/jocko/protocol"
"github.com/travisjeffery/jocko/raft"
"github.com/travisjeffery/jocko/serf"
"github.com/travisjeffery/jocko/server"
"github.com/travisjeffery/simplelog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)

type check struct {
Expand Down Expand Up @@ -139,7 +141,7 @@ func setup() func() {
}
r := prometheus.DefaultRegisterer
server := server.New(brokerAddr, store, logger, r)
if err := server.Start(); err != nil {
if err := server.Start(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err)
os.Exit(1)
}
Expand Down
14 changes: 14 additions & 0 deletions jocko.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jocko

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -166,8 +167,21 @@ type Raft interface {
Addr() string
}

type Request struct {
Conn io.ReadWriter
Header *protocol.RequestHeader
Request interface{}
}

type Response struct {
Conn io.ReadWriter
Header *protocol.RequestHeader
Response interface{}
}

// Broker is the interface that wraps the Broker's methods.
type Broker interface {
Run(context.Context, <-chan Request, chan<- Response)
ID() int32
IsController() bool
CreateTopic(topic string, partitions int32, replicationFactor int16) protocol.Error
Expand Down
14 changes: 7 additions & 7 deletions serf/serf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/travisjeffery/jocko"
"github.com/travisjeffery/jocko/serf"
"github.com/travisjeffery/jocko/testutil"
Expand All @@ -24,9 +24,9 @@ func init() {

func Test_Membership(t *testing.T) {
s0, err := getSerf(0)
assert.NoError(t, err)
require.NoError(t, err)
s1, err := getSerf(1)
assert.NoError(t, err)
require.NoError(t, err)

t.Run("Join Peer", func(t *testing.T) {
testJoin(t, s0, s1)
Expand All @@ -45,7 +45,7 @@ func Test_Membership(t *testing.T) {
})

t.Run("Remove Peer", func(t *testing.T) {
assert.NoError(t, s1.Shutdown())
require.NoError(t, s1.Shutdown())

testutil.WaitForResult(func() (bool, error) {
if len(s0.Cluster()) != 1 {
Expand All @@ -57,7 +57,7 @@ func Test_Membership(t *testing.T) {
})
})

assert.NoError(t, s0.Shutdown())
require.NoError(t, s0.Shutdown())
}

func getSerf(id int32) (*serf.Serf, error) {
Expand Down Expand Up @@ -85,7 +85,7 @@ func getSerfAddr() string {
func testJoin(t *testing.T, s0 *serf.Serf, other ...*serf.Serf) {
for ind, s1 := range other {
num, err := s1.Join(s0.Addr())
assert.NoError(t, err)
assert.Equal(t, ind+1, num)
require.NoError(t, err)
require.Equal(t, ind+1, num)
}
}
22 changes: 22 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package server

import "github.com/prometheus/client_golang/prometheus"

type serverMetrics struct {
requestsHandled prometheus.Counter
}

func newServerMetrics(r prometheus.Registerer) *serverMetrics {
m := &serverMetrics{
requestsHandled: prometheus.NewCounter(prometheus.CounterOpts{
Name: "requests_handled",
Help: "Number of requests handled by the server.",
}),
}
if r != nil {
r.MustRegister(
m.requestsHandled,
)
}
return m
}
Loading

0 comments on commit 83c6e61

Please sign in to comment.