diff --git a/replicator/partition_replicator.go b/replicator/partition_replicator.go index 30e618c9..b96085b3 100644 --- a/replicator/partition_replicator.go +++ b/replicator/partition_replicator.go @@ -45,11 +45,11 @@ func (r *PartitionReplicator) Replicate() { go r.writeMessages() } -func (r *PartitionReplicator) fetchMessages() error { +func (r *PartitionReplicator) fetchMessages() { for { select { case <-r.done: - return nil + return default: fetchBody := &protocol.FetchRequest{ MaxWaitTime: r.MaxWaitTime, @@ -69,29 +69,30 @@ func (r *PartitionReplicator) fetchMessages() error { } b, err := protocol.Encode(req) if err != nil { - return err + panic(err) } _, err = r.Partition.Leader.Write(b) if err != nil { - return err + panic(err) } - var header protocol.Response br := bytes.NewBuffer(make([]byte, 0, 8)) if _, err = io.CopyN(br, r.Partition.Leader, 8); err != nil { - return err + panic(err) } if err = protocol.Decode(br.Bytes(), &header); err != nil { - return err + panic(err) } c := make([]byte, 0, header.Size-4) cr := bytes.NewBuffer(c) _, err = io.CopyN(cr, r.Partition.Leader, int64(header.Size-4)) - + if err != nil { + panic(err) + } fetchResponse := new(protocol.FetchResponses) err = protocol.Decode(cr.Bytes(), fetchResponse) if err != nil { - return err + panic(err) } for _, resp := range fetchResponse.Responses { for _, p := range resp.PartitionResponses { @@ -107,15 +108,15 @@ func (r *PartitionReplicator) fetchMessages() error { } } -func (r *PartitionReplicator) writeMessages() error { +func (r *PartitionReplicator) writeMessages() { for { select { case <-r.done: - return nil + return case msg := <-r.msgs: _, err := r.Partition.CommitLog.Append(msg) if err != nil { - return err + panic(err) } } }