Skip to content

Commit

Permalink
Add timeout to endpointset metric collector
Browse files Browse the repository at this point in the history
We have seen deadlocks with endpoint discovery caused by the metric
collector hanging and not releasing the store labels lock. This causes
the endpoint update to hang, which also makes all endpoint readers hang on
acquiring a read lock for the resolved endpoints slice.

This commit makes sure the Collect method on the metrics collector has
a built in timeout to guard against cases where an upstream call never
reads from the collection channel.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored and hczhu-db committed Aug 22, 2024
1 parent 05cf10c commit 4c7f6a7
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,13 @@ type endpointSetNodeCollector struct {
connectionsWithKeys *prometheus.Desc
}

func newEndpointSetNodeCollector(labels ...string) *endpointSetNodeCollector {
func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointSetNodeCollector {
if len(labels) == 0 {
labels = []string{string(ExternalLabels), string(StoreType)}
}
desc := "Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier."
return &endpointSetNodeCollector{
logger: logger,
storeNodes: map[component.Component]map[string]int{},
connectionsDesc: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections",
Expand Down Expand Up @@ -334,7 +335,28 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
lbls = append(lbls, storeTypeStr)
}
}
ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), lbls...)
select {
case ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), lbls...):
case <-time.After(1 * time.Second):
level.Warn(c.logger).Log("msg", "failed to collect endpointset metrics", "timeout", 1*time.Second)
return
}
}
}
for replicaKey, occurrencesPerAddr := range c.storeNodesAddr {
for addr, occurrences := range occurrencesPerAddr {
ch <- prometheus.MustNewConstMetric(
c.connectionsWithAddr, prometheus.GaugeValue,
float64(occurrences),
replicaKey, addr)
}
}
for groupKey, occurrencesPerReplicaKey := range c.storeNodesKeys {
for replicaKeys, occurrences := range occurrencesPerReplicaKey {
ch <- prometheus.MustNewConstMetric(
c.connectionsWithKeys, prometheus.GaugeValue,
float64(occurrences),
groupKey, replicaKeys)
}
}
for replicaKey, occurrencesPerAddr := range c.storeNodesAddr {
Expand Down Expand Up @@ -392,7 +414,7 @@ func NewEndpointSet(
endpointInfoTimeout time.Duration,
endpointMetricLabels ...string,
) *EndpointSet {
endpointsMetric := newEndpointSetNodeCollector(endpointMetricLabels...)
endpointsMetric := newEndpointSetNodeCollector(logger, endpointMetricLabels...)
if reg != nil {
reg.MustRegister(endpointsMetric)
}
Expand Down

0 comments on commit 4c7f6a7

Please sign in to comment.