Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
trace request logging
Browse files Browse the repository at this point in the history
  • Loading branch information
cevian committed Nov 17, 2022
1 parent 88b71b9 commit 41bc38e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func Init(cfg Config) error {
// NOTE: we add a level of indirection with our logging functions,
// so we need additional caller depth
logger = log.With(l, "ts", timestampFormat, "caller", log.Caller(4))
traceRequestEnabled = isTraceRequestEnabled()
return nil
}

Expand Down Expand Up @@ -205,3 +206,28 @@ func WarnRateLimited(keyvals ...interface{}) {
func DebugRateLimited(keyvals ...interface{}) {
rateLimit(debug, keyvals...)
}

var traceRequestEnabled bool

func isTraceRequestEnabled() bool {
value := os.Getenv("PROMSCALE_TRACE_REQUEST")
if value == "" {
return false
}
enabled, err := strconv.ParseBool(value)
if err != nil || !enabled {
//assume off
return false
}
return true
}

func TraceRequestEnabled() bool {
return traceRequestEnabled
}

func TraceRequest(keyvals ...interface{}) {
if TraceRequestEnabled() {
Debug(keyvals...)
}
}
13 changes: 13 additions & 0 deletions pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/psctx"
"github.com/timescale/promscale/pkg/tracer"
tput "github.com/timescale/promscale/pkg/util/throughput"
)
Expand Down Expand Up @@ -245,6 +246,16 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
}
p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan}
}

var startTime time.Time
if log.TraceRequestEnabled() {
t, err := psctx.StartTime(ctx)
if err != nil {
log.TraceRequest("component", "dispatcher", "err", err)
}
startTime = t
log.TraceRequest("component", "dispatcher", "event", "start", "metrics", len(rows), "samples", numRows, "start_time", startTime.UnixNano())
}
span.SetAttributes(attribute.Int64("num_rows", int64(numRows)))
span.SetAttributes(attribute.Int("num_metrics", len(rows)))
reportIncomingBatch(numRows)
Expand All @@ -261,6 +272,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
case err = <-errChan:
default:
}
log.TraceRequest("component", "dispatcher", "event", "ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime))
reportMetricsTelemetry(maxt, numRows, 0)
close(errChan)
} else {
Expand All @@ -275,6 +287,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
if err != nil {
log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err)
}
log.TraceRequest("component", "dispatcher", "event", "async_ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime))
reportMetricsTelemetry(maxt, numRows, 0)
}()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/pgmodel/ingestor/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/timescale/promscale/pkg/log"
)

type reservation struct {
Expand Down Expand Up @@ -163,6 +165,7 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) {
case <-waitch:
case <-time.After(250 * time.Millisecond):
}
log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.q.Len(), "waited", waited, "took", time.Since((*rq.q)[0].GetStartTime()))
}
return reservation.GetStartTime(), ok
}
Expand Down Expand Up @@ -213,6 +216,7 @@ func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, in
} else if !(len(batch) == 0 || items+total_items < 20000) {
reason = "size_samples"
}
log.TraceRequest("component", "reservation", "event", "pop", "reason", reason, "metrics", count, "items", total_items, "remaining_metrics", rq.q.Len())
return batch, count, reason
}

Expand Down

0 comments on commit 41bc38e

Please sign in to comment.