diff --git a/log/DESIGN.md b/log/DESIGN.md index 029bc19fc38..6cf2221b9c4 100644 --- a/log/DESIGN.md +++ b/log/DESIGN.md @@ -605,14 +605,6 @@ However, in this approach we would need have factory functions for both types. It would make the API surface unnecessarily big, and we may even have problems naming the functions. -## Open issues - -The Logs Bridge API MUST NOT be released as stable -before all issues below are closed: - -- [Clarify attributes parameter type of Get a Logger operation](https://github.com/open-telemetry/opentelemetry-specification/issues/3841) -- [Add an Enabled method to Logger](https://github.com/open-telemetry/opentelemetry-specification/issues/3917) - [^1]: [Handle structured body and attributes](https://github.com/pellared/opentelemetry-go/pull/7) [^2]: Jonathan Amsterdam, [The Go Blog: Structured Logging with slog](https://go.dev/blog/slog) [^3]: Jonathan Amsterdam, [GopherCon Europe 2023: A Fast Structured Logging Package](https://www.youtube.com/watch?v=tC4Jt3i62ns) diff --git a/sdk/log/DESIGN.md b/sdk/log/DESIGN.md index bd47d2683d7..41f1cdadf0f 100644 --- a/sdk/log/DESIGN.md +++ b/sdk/log/DESIGN.md @@ -121,15 +121,6 @@ provided via API. Moreover it is safer to have these abstraction decoupled. E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors. -## Open issues - -The Logs SDK MUST NOT be released as stable before all issues below are closed: - -- [Clarify that ReadableLogRecord and ReadWriteLogRecord can be represented using a single type](https://github.com/open-telemetry/opentelemetry-specification/pull/3898) -- [Fix what can be modified via ReadWriteLogRecord](https://github.com/open-telemetry/opentelemetry-specification/pull/3907) -- [logs: Allow duplicate keys](https://github.com/open-telemetry/opentelemetry-specification/issues/3931) -- [Add an Enabled method to Logger](https://github.com/open-telemetry/opentelemetry-specification/issues/3917) - [^1]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs) [^2]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480) [^3]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 05deaeaa545..dff4dc9c28d 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -5,6 +5,8 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" + + "go.opentelemetry.io/otel" ) // Exporter handles the delivery of log records to external receivers. @@ -50,3 +52,51 @@ func (noopExporter) Export(context.Context, []Record) error { return nil } func (noopExporter) Shutdown(context.Context) error { return nil } func (noopExporter) ForceFlush(context.Context) error { return nil } + +// exportSync exports all data from input using exporter in a spawned +// goroutine. The returned chan will be closed when the spawned goroutine +// completes. +func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) { + done = make(chan struct{}) + go func() { + defer close(done) + for data := range input { + data.DoExport(exporter.Export) + } + }() + return done +} + +// exportData is data related to an export. +type exportData struct { + ctx context.Context + records []Record + + // respCh is the channel any error returned from the export will be sent + // on. If this is nil, and the export error is non-nil, the error will + // passed to the OTel error handler. + respCh chan<- error +} + +// DoExport calls exportFn with the data contained in e. The error response +// will be returned on e's respCh if not nil. The error will be handled by the +// default OTel error handle if it is not nil and respCh is nil or full. +func (e exportData) DoExport(exportFn func(context.Context, []Record) error) { + if len(e.records) == 0 { + e.respond(nil) + return + } + + e.respond(exportFn(e.ctx, e.records)) +} + +func (e exportData) respond(err error) { + select { + case e.respCh <- err: + default: + // e.respCh is nil or busy, default to otel.Handler. + if err != nil { + otel.Handle(err) + } + } +} diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go new file mode 100644 index 00000000000..4eb2056d1b2 --- /dev/null +++ b/sdk/log/exporter_test.go @@ -0,0 +1,198 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log + +import ( + "context" + "slices" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/log" +) + +type instruction struct { + Record *[]Record + Flush chan [][]Record +} + +type testExporter struct { + // Err is the error returned by all methods of the testExporter. + Err error + + // Counts of method calls. + exportN, shutdownN, forceFlushN *int32 + + input chan instruction + done chan struct{} +} + +func newTestExporter(err error) *testExporter { + e := &testExporter{ + Err: err, + exportN: new(int32), + shutdownN: new(int32), + forceFlushN: new(int32), + input: make(chan instruction), + } + e.done = run(e.input) + + return e +} + +func run(input chan instruction) chan struct{} { + done := make(chan struct{}) + go func() { + defer close(done) + + var records [][]Record + for in := range input { + if in.Record != nil { + records = append(records, *in.Record) + } + if in.Flush != nil { + cp := slices.Clone(records) + records = records[:0] + in.Flush <- cp + } + } + }() + return done +} + +func (e *testExporter) Records() [][]Record { + out := make(chan [][]Record, 1) + e.input <- instruction{Flush: out} + return <-out +} + +func (e *testExporter) Export(ctx context.Context, r []Record) error { + atomic.AddInt32(e.exportN, 1) + e.input <- instruction{Record: &r} + return e.Err +} + +func (e *testExporter) ExportN() int { + return int(atomic.LoadInt32(e.exportN)) +} + +func (e *testExporter) Stop() { + close(e.input) + <-e.done +} + +func (e *testExporter) Shutdown(ctx context.Context) error { + atomic.AddInt32(e.shutdownN, 1) + return e.Err +} + +func (e *testExporter) ShutdownN() int { + return int(atomic.LoadInt32(e.shutdownN)) +} + +func (e *testExporter) ForceFlush(ctx context.Context) error { + atomic.AddInt32(e.forceFlushN, 1) + return e.Err +} + +func (e *testExporter) ForceFlushN() int { + return int(atomic.LoadInt32(e.forceFlushN)) +} + +func TestExportSync(t *testing.T) { + eventuallyDone := func(t *testing.T, done chan struct{}) { + assert.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, 2*time.Second, time.Microsecond) + } + + t.Run("ErrorHandler", func(t *testing.T) { + var got error + handler := otel.ErrorHandlerFunc(func(err error) { got = err }) + otel.SetErrorHandler(handler) + + in := make(chan exportData, 1) + exp := newTestExporter(assert.AnError) + t.Cleanup(exp.Stop) + done := exportSync(in, exp) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + in <- exportData{ + ctx: context.Background(), + records: make([]Record, 1), + } + }() + + wg.Wait() + close(in) + eventuallyDone(t, done) + + assert.ErrorIs(t, got, assert.AnError, "error not passed to ErrorHandler") + }) + + t.Run("ConcurrentSafe", func(t *testing.T) { + in := make(chan exportData, 1) + exp := newTestExporter(assert.AnError) + t.Cleanup(exp.Stop) + done := exportSync(in, exp) + + const goRoutines = 10 + var wg sync.WaitGroup + wg.Add(goRoutines) + for i := 0; i < goRoutines; i++ { + go func(n int) { + defer wg.Done() + + var r Record + r.SetBody(log.IntValue(n)) + + resp := make(chan error, 1) + in <- exportData{ + ctx: context.Background(), + records: []Record{r}, + respCh: resp, + } + + assert.ErrorIs(t, <-resp, assert.AnError) + }(i) + } + + // Empty records should be ignored. + in <- exportData{ctx: context.Background()} + + wg.Wait() + + close(in) + eventuallyDone(t, done) + + assert.Equal(t, goRoutines, exp.ExportN(), "Export calls") + + want := make([]log.Value, goRoutines) + for i := range want { + want[i] = log.IntValue(i) + } + records := exp.Records() + got := make([]log.Value, len(records)) + for i := range got { + if assert.Len(t, records[i], 1, "number of records exported") { + got[i] = records[i][0].Body() + } + } + assert.ElementsMatch(t, want, got, "record bodies") + }) +}