diff --git a/input/otlp/logs.go b/input/otlp/logs.go index 9538a6ba..29de04ba 100644 --- a/input/otlp/logs.go +++ b/input/otlp/logs.go @@ -118,7 +118,11 @@ func (c *Consumer) convertLogRecord( ) *modelpb.APMEvent { event := baseEvent.CloneVT() initEventLabels(event) - event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) + if record.Timestamp() == 0 { + event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta)) + } else { + event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) + } if event.Event == nil { event.Event = modelpb.EventFromVTPool() } diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index f12fc6e5..43844e46 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -467,6 +467,106 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) { assert.Equal(t, "MyEvent", processed[0].Event.Action) } +func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(timestamp) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) + + assert.Len(t, processed, 1) + assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + +func TestConsumerConsumeOTelLogsWithoutTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(0)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(0) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) + + assert.Len(t, processed, 1) + assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + +func TestConsumerConsumeOTelLogsWithObservedTimestampWithoutTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + observedTimestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(0) + record1.SetObservedTimestamp(observedTimestamp) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) + + assert.Len(t, processed, 1) + assert.Equal(t, int(observedTimestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + func TestConsumerConsumeLogsLabels(t *testing.T) { logs := plog.NewLogs() resourceLogs := logs.ResourceLogs().AppendEmpty()