Skip to content

Commit

Permalink
replicator: fail fast when something goes wrong during replication
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 5, 2016
1 parent 22b8a88 commit 222cc90
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions replicator/partition_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func (r *PartitionReplicator) Replicate() {
go r.writeMessages()
}

func (r *PartitionReplicator) fetchMessages() error {
func (r *PartitionReplicator) fetchMessages() {
for {
select {
case <-r.done:
return nil
return
default:
fetchBody := &protocol.FetchRequest{
MaxWaitTime: r.MaxWaitTime,
Expand All @@ -69,29 +69,30 @@ func (r *PartitionReplicator) fetchMessages() error {
}
b, err := protocol.Encode(req)
if err != nil {
return err
panic(err)
}
_, err = r.Partition.Leader.Write(b)
if err != nil {
return err
panic(err)
}

var header protocol.Response
br := bytes.NewBuffer(make([]byte, 0, 8))
if _, err = io.CopyN(br, r.Partition.Leader, 8); err != nil {
return err
panic(err)
}
if err = protocol.Decode(br.Bytes(), &header); err != nil {
return err
panic(err)
}
c := make([]byte, 0, header.Size-4)
cr := bytes.NewBuffer(c)
_, err = io.CopyN(cr, r.Partition.Leader, int64(header.Size-4))

if err != nil {
panic(err)
}
fetchResponse := new(protocol.FetchResponses)
err = protocol.Decode(cr.Bytes(), fetchResponse)
if err != nil {
return err
panic(err)
}
for _, resp := range fetchResponse.Responses {
for _, p := range resp.PartitionResponses {
Expand All @@ -107,15 +108,15 @@ func (r *PartitionReplicator) fetchMessages() error {
}
}

func (r *PartitionReplicator) writeMessages() error {
func (r *PartitionReplicator) writeMessages() {
for {
select {
case <-r.done:
return nil
return
case msg := <-r.msgs:
_, err := r.Partition.CommitLog.Append(msg)
if err != nil {
return err
panic(err)
}
}
}
Expand Down

0 comments on commit 222cc90

Please sign in to comment.