Skip to content

Commit

Permalink
feat(distributor): Add stream metadata writes to separate topic (#15648)
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis authored Jan 9, 2025
1 parent f2fc0c2 commit 70d8206
Show file tree
Hide file tree
Showing 5 changed files with 662 additions and 231 deletions.
4 changes: 4 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,10 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
return fmt.Errorf("failed to marshal write request to records: %w", err)
}

// Add metadata record
metadataRecord := kafka.EncodeStreamMetadata(partitionID, d.cfg.KafkaConfig.Topic, tenant, stream.Stream, startTime)
records = append(records, metadataRecord)

d.kafkaRecordsPerRequest.Observe(float64(len(records)))

produceResults := d.kafkaWriter.ProduceSync(ctx, records)
Expand Down
75 changes: 70 additions & 5 deletions pkg/kafka/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
math_bits "math/bits"
"sync"
"time"

"github.com/twmb/franz-go/pkg/kgo"

Expand All @@ -16,11 +17,23 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
)

var encoderPool = sync.Pool{
New: func() any {
return &logproto.Stream{}
},
}
const (
metadataTopicSuffix = ".metadata"
)

var (
encoderPool = sync.Pool{
New: func() any {
return &logproto.Stream{}
},
}

metadataPool = sync.Pool{
New: func() any {
return &logproto.StreamMetadata{}
},
}
)

// Encode converts a logproto.Stream into one or more Kafka records.
// It handles splitting large streams into multiple records if necessary.
Expand Down Expand Up @@ -182,3 +195,55 @@ func (d *Decoder) DecodeWithoutLabels(data []byte) (logproto.Stream, error) {
func sovPush(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}

// EncodeStreamMetadata encodes the stream metadata into a Kafka record
// using the tenantID as the key and partition as the target partition
func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream logproto.Stream, lastSeenAt time.Time) *kgo.Record {
// Validate stream
if stream.Labels == "" || stream.Hash == 0 {
return nil
}

// Get metadata from pool
metadata := metadataPool.Get().(*logproto.StreamMetadata)
defer metadataPool.Put(metadata)

// Transform stream into metadata
metadata.StreamHash = stream.Hash
metadata.LastSeenAt = lastSeenAt.UnixNano()

// Encode the metadata into a byte slice
value, err := metadata.Marshal()
if err != nil {
// Since we're in a function that returns a *kgo.Record, we can't return an error.
// The best we can do is return nil which will fail later.
return nil
}

return &kgo.Record{
Key: []byte(tenantID),
Value: value,
Partition: partition,
Topic: topic + metadataTopicSuffix,
}
}

// DecodeStreamMetadata decodes a Kafka record into a StreamMetadata.
// It returns the decoded metadata and any error encountered.
func DecodeStreamMetadata(record *kgo.Record) (*logproto.StreamMetadata, error) {
if record == nil {
return nil, errors.New("nil record")
}

if record.Value == nil {
return nil, errors.New("nil record value")
}

metadata := metadataPool.Get().(*logproto.StreamMetadata)
if err := metadata.Unmarshal(record.Value); err != nil {
metadataPool.Put(metadata)
return nil, fmt.Errorf("failed to unmarshal stream metadata: %w", err)
}

return metadata, nil
}
115 changes: 115 additions & 0 deletions pkg/kafka/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/logproto"
)
Expand Down Expand Up @@ -149,3 +150,117 @@ func generateRandomString(length int) string {
}
return string(b)
}

func TestEncodeDecodeStreamMetadata(t *testing.T) {
tests := []struct {
name string
stream logproto.Stream
partition int32
topic string
tenantID string
lastSeenAt time.Time
expectErr bool
}{
{
name: "Valid metadata",
stream: logproto.Stream{
Labels: `{app="test"}`,
Hash: 12345,
},
partition: 1,
topic: "logs",
tenantID: "tenant-1",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: false,
},
{
name: "Empty labels - should error",
stream: logproto.Stream{
Labels: "",
Hash: 67890,
},
partition: 2,
topic: "metrics",
tenantID: "tenant-2",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
},
{
name: "Zero hash - should error",
stream: logproto.Stream{
Labels: `{app="test"}`,
Hash: 0,
},
partition: 3,
topic: "traces",
tenantID: "tenant-3",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
},
{
name: "Empty labels and zero hash - should error",
stream: logproto.Stream{
Labels: "",
Hash: 0,
},
partition: 4,
topic: "traces",
tenantID: "tenant-4",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Encode metadata
record := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.stream, tt.lastSeenAt)
if tt.expectErr {
require.Nil(t, record)
return
}

require.NotNil(t, record)
require.NotNil(t, record.Value)
require.Equal(t, tt.topic+metadataTopicSuffix, record.Topic)
require.Equal(t, tt.partition, record.Partition)
require.Equal(t, []byte(tt.tenantID), record.Key)

// Decode metadata
metadata, err := DecodeStreamMetadata(record)
require.NoError(t, err)
require.NotNil(t, metadata)

// Verify decoded values
require.Equal(t, tt.stream.Hash, metadata.StreamHash)
require.Equal(t, tt.lastSeenAt.UnixNano(), metadata.LastSeenAt)

// Return metadata to pool
metadataPool.Put(metadata)
})
}

t.Run("Decode nil record", func(t *testing.T) {
metadata, err := DecodeStreamMetadata(nil)
require.Error(t, err)
require.Nil(t, metadata)
})

t.Run("Decode nil value", func(t *testing.T) {
record := &kgo.Record{
Value: nil,
}
metadata, err := DecodeStreamMetadata(record)
require.Error(t, err)
require.Nil(t, metadata)
})

t.Run("Decode invalid value", func(t *testing.T) {
record := &kgo.Record{
Value: []byte("invalid data"),
}
metadata, err := DecodeStreamMetadata(record)
require.Error(t, err)
require.Nil(t, metadata)
})
}
Loading

0 comments on commit 70d8206

Please sign in to comment.