Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add OpenTelemetry implementation #9254

Merged
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
f29d6fb
feat(spanner): add opentelemetry instrumentation
harshachinta Jan 12, 2024
c95c517
Merge branch 'main' into open-telemetry-instrumentation
harshachinta Jan 12, 2024
5da2787
feat(spanner): add header
harshachinta Jan 12, 2024
67dd928
feat(spanner): go mod tidy
harshachinta Jan 12, 2024
dc37d63
Merge branch 'main' into open-telemetry-instrumentation
harshachinta Jan 13, 2024
e838394
feat(spanner): code refactoring
harshachinta Jan 13, 2024
dd99e8a
feat(spanner): pass context
harshachinta Jan 13, 2024
0a05776
feat(spanner): fix vet
harshachinta Jan 13, 2024
2dfd049
feat(spanner): code refactoring
harshachinta Jan 13, 2024
1c85426
feat(spanner): add lock when seeting ot config to avoid data race
harshachinta Jan 13, 2024
a56164f
feat(spanner): code refactoring
harshachinta Jan 13, 2024
08c292c
feat(spanner): add new package for testing open telemetery
harshachinta Jan 13, 2024
05548b5
feat(spanner): aadd header to new files
harshachinta Jan 13, 2024
2baa345
feat(spanner): testing
harshachinta Jan 13, 2024
3ff530d
feat(spanner): mark go version as 1.19
harshachinta Jan 13, 2024
593bbaa
feat(spanner): update metrics test cases
harshachinta Jan 14, 2024
e94a38e
feat(spanner): metrics code refactoring
harshachinta Jan 16, 2024
6d55d2a
feat(spanner): add OT traces
harshachinta Jan 16, 2024
e15684e
feat(spanner): comment OT metric sdk
harshachinta Jan 16, 2024
077393b
feat(spanner): hardcode context
harshachinta Jan 16, 2024
522b3d0
feat(spanner): odce refactoring
harshachinta Jan 16, 2024
85a3117
feat(spanner): add header
harshachinta Jan 16, 2024
90e8f22
feat(spanner): code refactoring
harshachinta Jan 16, 2024
c0e57cd
feat(spanner): comment refactoring
harshachinta Jan 16, 2024
6c494d0
feat(spanner): code refactoring
harshachinta Jan 16, 2024
9c02578
feat(spanner): remove disable from integration_test
harshachinta Jan 16, 2024
99260f4
feat(spanner): rename metrics with prefix 1
harshachinta Jan 17, 2024
c87da45
feat(spanner): should revert: add benchmark code
harshachinta Jan 17, 2024
a517d20
feat(spanner): should revert: add benchmark code
harshachinta Jan 17, 2024
8e6b124
feat(spanner): should revert: add benchmark code
harshachinta Jan 17, 2024
52228a5
feat(spanner): should revert: add benchmark code
harshachinta Jan 17, 2024
2409bd0
feat(spanner): should revert: add benchmark code
harshachinta Jan 18, 2024
5e1e56c
feat(spanner): should revert: add benchmark code
harshachinta Jan 18, 2024
ea434de
feat(spanner): should revert: add benchmark code
harshachinta Jan 18, 2024
96468c6
feat(spanner): should revert: add benchmark code
harshachinta Jan 18, 2024
febf7c0
feat(spanner): revert all benchmark codes
harshachinta Jan 18, 2024
c51519f
Merge branch 'main' into open-telemetry-instrumentation
harshachinta Jan 18, 2024
b9287bc
feat(spanner): upgrade metrics SDK to latest 1.22.0 that has fix for …
harshachinta Jan 18, 2024
e693353
feat(spanner): rename metrics back to original name
harshachinta Jan 19, 2024
73ba24b
feat(spanner): remove debugging logs
harshachinta Jan 19, 2024
fbf1d59
feat(spanner): rename method
harshachinta Jan 19, 2024
c7a8580
feat(spanner): pass context that is done, OT team has fixed this issue
harshachinta Jan 19, 2024
45d3c2d
Merge branch 'main' into open-telemetry-instrumentation
harshachinta Jan 19, 2024
8083608
feat(spanner): skip OT tests for go version 1.19
harshachinta Jan 20, 2024
a29e471
Merge branch 'main' into open-telemetry-instrumentation
harshachinta Jan 20, 2024
e252c17
feat(spanner): revert
harshachinta Jan 20, 2024
4834124
feat(spanner): revert
harshachinta Jan 20, 2024
38e13b6
feat(spanner): avoid skipping tests for Go1.19
harshachinta Jan 20, 2024
6695f01
feat(spanner): add build constrints - compile OT test files only for …
harshachinta Jan 20, 2024
c856096
feat(spanner): add deprecation warning to OpenCensus code
harshachinta Jan 20, 2024
0bc1fc4
Merge branch 'main' into open-telemetry-instrumentation
harshachinta Feb 7, 2024
eafd2cc
feat(spanner): go mod tidy
harshachinta Feb 7, 2024
d8f02b6
Merge branch 'main' into open-telemetry-instrumentation
harshachinta Feb 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.work
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,5 @@ use (
./websecurityscanner
./workflows
./workstations
./spanner/test/opentelemetry/test
)
181 changes: 181 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
if metricErr := recordGFELatencyMetricsOT(ctx, md, "PartitionReadUsingIndexWithOptions", t.otConfig); metricErr != nil {
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Expand Down Expand Up @@ -213,6 +216,9 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
if metricErr := recordGFELatencyMetricsOT(ctx, md, "partitionQuery", t.otConfig); metricErr != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
}

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Expand Down Expand Up @@ -284,6 +290,9 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
if metricErr := recordGFELatencyMetricsOT(ctx, md, "Cleanup", t.otConfig); metricErr != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
}

if err != nil {
var logger *log.Logger
Expand Down Expand Up @@ -336,6 +345,9 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
if metricErr := recordGFELatencyMetricsOT(ctx, md, "Execute", t.otConfig); metricErr != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
}
return client, err
}
} else {
Expand Down Expand Up @@ -363,6 +375,9 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
if metricErr := recordGFELatencyMetricsOT(ctx, md, "Execute", t.otConfig); metricErr != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency through OpenTelemetry. Error: %v", metricErr)
}
return client, err
}
}
Expand Down
250 changes: 250 additions & 0 deletions spanner/benchmarks_oc_ot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
Copyright 2020 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package spanner

import (
"context"
"fmt"
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
"log"
"math/rand"
"sort"
"sync"
"testing"
"time"

"go.opencensus.io/trace"
"google.golang.org/api/option"

"contrib.go.opencensus.io/exporter/stackdriver"
"go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/iterator"
)

var muElapsedTimes sync.Mutex
var elapsedTimes []time.Duration
var (
selectQuery = "SELECT ID FROM BENCHMARK WHERE ID = @id"
updateQuery = "UPDATE BENCHMARK SET BAR=1 WHERE ID = @id"
idColumnName = "id"
randomSearchSpace = 99999
totalReadsPerThread = 30000
totalUpdatesPerThread = 10000
parallelThreads = 5
)

func createBenchmarkActualServer(ctx context.Context, incStep uint64, clientConfig ClientConfig, database string, mp *metric.MeterProvider) (client *Client, err error) {
t := &testing.T{}
clientConfig.SessionPoolConfig = SessionPoolConfig{
MinOpened: 100,
MaxOpened: 400,
incStep: incStep,
}
if mp != nil {
clientConfig.OpenTelemetryMeterProvider = mp
}
options := []option.ClientOption{option.WithEndpoint("staging-wrenchworks.sandbox.googleapis.com:443")}
client, err = NewClientWithConfig(ctx, database, clientConfig, options...)
if err != nil {
log.Printf("Newclient error : %q", err)
}
log.Printf("New client initialized")
// Wait until the session pool has been initialized.
waitFor(t, func() error {
if uint64(client.idleSessions.idleList.Len()) == client.idleSessions.MinOpened {
return nil
}
return fmt.Errorf("not yet initialized")
})
return
}

func readWorkerReal(client *Client, b *testing.B, jobs <-chan int, results chan<- int) {
for range jobs {
startTime := time.Now()
iter := client.Single().Query(context.Background(), getRandomisedReadStatement())
row := 0
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
b.Fatal(err)
}
row++
}
iter.Stop()

// Calculate the elapsed time
elapsedTime := time.Since(startTime)
storeElapsedTime(elapsedTime)

// return row as 1, so that we know total number of queries executed.
results <- row
}
}

func writeWorkerReal(client *Client, b *testing.B, jobs <-chan int, results chan<- int64) {
for range jobs {
startTime := time.Now()
var updateCount int64
var err error
if _, err = client.ReadWriteTransaction(context.Background(), func(ctx context.Context, transaction *ReadWriteTransaction) error {
if updateCount, err = transaction.Update(ctx, getRandomisedUpdateStatement()); err != nil {
return err
}
return nil
}); err != nil {
b.Fatal(err)
}

// Calculate the elapsed time
elapsedTime := time.Since(startTime)
storeElapsedTime(elapsedTime)

results <- updateCount
}
}

func BenchmarkClientBurstReadWriteIncStep25RealServerOpenCensus(b *testing.B) {
b.Logf("Running Burst Write Benchmark With OpenCensus instrumentation")
if err := EnableStatViews(); err != nil {
log.Fatalf("Failed: %v", err)
}
if err := EnableGfeLatencyView(); err != nil {
log.Fatalf("Failed: %v", err)
}
elapsedTimes = []time.Duration{}
// Create OpenCensus Stackdriver exporter.
sd, err := stackdriver.NewExporter(stackdriver.Options{
ProjectID: "span-cloud-testing",
ReportingInterval: 10 * time.Second,
//TraceSpansBufferMaxBytes: 100,
BundleDelayThreshold: 50 * time.Millisecond,
BundleCountThreshold: 5000,
})
sd.StartMetricsExporter()
// Register it as a trace exporter
trace.RegisterExporter(sd)
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
if err != nil {
log.Fatalf("Failed: %v", err)
}

meterProvider := setupAndEnableOT()
burstReadAndWrite(b, 25, "projects/span-cloud-testing/instances/harsha-test-gcloud/databases/database1", meterProvider)
sd.Flush()
sd.StopMetricsExporter()
}

func burstReadAndWrite(b *testing.B, incStep uint64, database string, mp *metric.MeterProvider) {
for n := 0; n < b.N; n++ {
log.Printf("burstReadAndWrite called once")
client, err := createBenchmarkActualServer(context.Background(), incStep, ClientConfig{}, database, mp)
if err != nil {
b.Fatalf("Failed to initialize the client: error : %q", err)
}
sp := client.idleSessions
if uint64(sp.idleList.Len()) != sp.MinOpened {
b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len(), sp.MinOpened)
}

totalUpdates := parallelThreads * totalUpdatesPerThread
writeJobs := make(chan int, totalUpdates)
writeResults := make(chan int64, totalUpdates)
parallelWrites := parallelThreads

totalQueries := parallelThreads * totalReadsPerThread
readJobs := make(chan int, totalQueries)
readResults := make(chan int, totalQueries)
parallelReads := parallelThreads

for w := 0; w < parallelWrites; w++ {
go writeWorkerReal(client, b, writeJobs, writeResults)
}
for j := 0; j < totalUpdates; j++ {
writeJobs <- j
}
for w := 0; w < parallelReads; w++ {
go readWorkerReal(client, b, readJobs, readResults)
}
for j := 0; j < totalQueries; j++ {
readJobs <- j
}

close(writeJobs)
close(readJobs)

totalUpdatedRows := int64(0)
for a := 0; a < totalUpdates; a++ {
totalUpdatedRows = totalUpdatedRows + <-writeResults
}
b.Logf("Total Updates: %d", totalUpdatedRows)
totalReadRows := 0
for a := 0; a < totalQueries; a++ {
totalReadRows = totalReadRows + <-readResults
}
b.Logf("Total Reads: %d", totalReadRows)
reportBenchmarkResults(b, sp)
client.Close()
}
}

func reportBenchmarkResults(b *testing.B, sp *sessionPool) {
sp.mu.Lock()
defer sp.mu.Unlock()
b.Logf("NumSessions: %d\t", sp.idleList.Len())

muElapsedTimes.Lock()
defer muElapsedTimes.Unlock()
sort.Slice(elapsedTimes, func(i, j int) bool {
return elapsedTimes[i] < elapsedTimes[j]
})

b.Logf("Total number of queries: %d\n", len(elapsedTimes))
// b.Logf("%q", elapsedTimes)
b.Logf("P50: %q\n", percentile(50, elapsedTimes))
b.Logf("P95: %q\n", percentile(95, elapsedTimes))
b.Logf("P99: %q\n", percentile(99, elapsedTimes))
elapsedTimes = nil
}

func percentile(percentile int, orderedResults []time.Duration) time.Duration {
index := percentile * len(orderedResults) / 100
value := orderedResults[index]
return value
}

func storeElapsedTime(elapsedTime time.Duration) {
muElapsedTimes.Lock()
defer muElapsedTimes.Unlock()
elapsedTimes = append(elapsedTimes, elapsedTime)
}

func getRandomisedReadStatement() Statement {
randomKey := rand.Intn(randomSearchSpace)
stmt := NewStatement(selectQuery)
stmt.Params["id"] = randomKey
return stmt
}

func getRandomisedUpdateStatement() Statement {
randomKey := rand.Intn(randomSearchSpace)
stmt := NewStatement(updateQuery)
stmt.Params["id"] = randomKey
return stmt
}
Loading
Loading