Skip to content

Commit

Permalink
Merge pull request #12343 from hashicorp/dnephin/blocking-query-docs
Browse files Browse the repository at this point in the history
rpc: improve docs for blockingQuery
  • Loading branch information
dnephin authored Feb 15, 2022
2 parents fc71215 + 3301f94 commit 09d61e6
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 24 deletions.
7 changes: 4 additions & 3 deletions agent/consul/gateway_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
"sync"
"time"

"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
)

// GatewayLocator assists in selecting an appropriate mesh gateway when wan
Expand Down Expand Up @@ -269,7 +270,7 @@ func getRandomItem(items []string) string {
}

type serverDelegate interface {
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
blockingQuery(queryOpts blockingQueryOptions, queryMeta blockingQueryResponseMeta, fn queryFn) error
IsLeader() bool
LeaderLastContact() time.Time
setDatacenterSupportsFederationStates()
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/gateway_locator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ func (d *testServerDelegate) datacenterSupportsFederationStates() bool {

// This is just enough to exercise the logic.
func (d *testServerDelegate) blockingQuery(
queryOpts structs.QueryOptionsCompat,
queryMeta structs.QueryMetaCompat,
queryOpts blockingQueryOptions,
queryMeta blockingQueryResponseMeta,
fn queryFn,
) error {
minQueryIndex := queryOpts.GetMinQueryIndex()
Expand Down
86 changes: 67 additions & 19 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
Expand All @@ -24,6 +23,8 @@ import (
"github.com/hashicorp/yamux"
"google.golang.org/grpc"

msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/wanfed"
Expand Down Expand Up @@ -910,35 +911,82 @@ func (s *Server) raftApplyWithEncoder(
return resp, nil
}

// queryFn is used to perform a query operation. If a re-query is needed, the
// passed-in watch set will be used to block for changes. The passed-in state
// store should be used (vs. calling fsm.State()) since the given state store
// will be correctly watched for changes if the state store is restored from
// a snapshot.
// queryFn is used to perform a query operation. See Server.blockingQuery for
// the requirements of this function.
type queryFn func(memdb.WatchSet, *state.Store) error

// blockingQuery is used to process a potentially blocking query operation.
func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error {
// blockingQueryOptions are options used by Server.blockingQuery to modify the
// behaviour of the query operation, or to populate response metadata.
type blockingQueryOptions interface {
GetToken() string
GetMinQueryIndex() uint64
GetMaxQueryTime() time.Duration
GetRequireConsistent() bool
}

// blockingQueryResponseMeta is an interface used to populate the response struct
// with metadata about the query and the state of the server.
type blockingQueryResponseMeta interface {
SetLastContact(time.Duration)
SetKnownLeader(bool)
GetIndex() uint64
SetIndex(uint64)
SetResultsFilteredByACLs(bool)
}

// blockingQuery performs a blocking query if opts.GetMinQueryIndex is
// greater than 0, otherwise performs a non-blocking query. Blocking queries will
// block until responseMeta.Index is greater than opts.GetMinQueryIndex,
// or opts.GetMaxQueryTime is reached. Non-blocking queries return immediately
// after performing the query.
//
// If opts.GetRequireConsistent is true, blockingQuery will first verify it is
// still the cluster leader before performing the query.
//
// The query function is expected to be a closure that has access to responseMeta
// so that it can set the Index. The actual result of the query is opaque to blockingQuery.
// If query function returns an error, the error is returned to the caller immediately.
//
// The query function must follow these rules:
//
// 1. to access data it must use the passed in state.Store.
// 2. it must set the responseMeta.Index to an index greater than
// opts.GetMinQueryIndex if the results return by the query have changed.
// 3. any channels added to the memdb.WatchSet must unblock when the results
// returned by the query have changed.
//
// To ensure optimal performance of the query, the query function should make a
// best-effort attempt to follow these guidelines:
//
// 1. only set responseMeta.Index to an index greater than
// opts.GetMinQueryIndex when the results returned by the query have changed.
// 2. any channels added to the memdb.WatchSet should only unblock when the
// results returned by the query have changed.
func (s *Server) blockingQuery(
opts blockingQueryOptions,
responseMeta blockingQueryResponseMeta,
query queryFn,
) error {
var ctx context.Context = &lib.StopChannelContext{StopCh: s.shutdownCh}

metrics.IncrCounter([]string{"rpc", "query"}, 1)

minQueryIndex := queryOpts.GetMinQueryIndex()
minQueryIndex := opts.GetMinQueryIndex()
// Perform a non-blocking query
if minQueryIndex == 0 {
if queryOpts.GetRequireConsistent() {
if opts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
}

var ws memdb.WatchSet
err := fn(ws, s.fsm.State())
s.setQueryMeta(queryMeta, queryOpts.GetToken())
err := query(ws, s.fsm.State())
s.setQueryMeta(responseMeta, opts.GetToken())
return err
}

timeout := s.rpcQueryTimeout(queryOpts.GetMaxQueryTime())
timeout := s.rpcQueryTimeout(opts.GetMaxQueryTime())
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

Expand All @@ -948,7 +996,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))

for {
if queryOpts.GetRequireConsistent() {
if opts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
return err
}
Expand All @@ -964,13 +1012,13 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
// whole state store is abandoned.
ws.Add(state.AbandonCh())

err := fn(ws, state)
s.setQueryMeta(queryMeta, queryOpts.GetToken())
err := query(ws, state)
s.setQueryMeta(responseMeta, opts.GetToken())
if err != nil {
return err
}

if queryMeta.GetIndex() > minQueryIndex {
if responseMeta.GetIndex() > minQueryIndex {
return nil
}

Expand All @@ -992,7 +1040,7 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
// setQueryMeta is used to populate the QueryMeta data for an RPC call
//
// Note: This method must be called *after* filtering query results with ACLs.
func (s *Server) setQueryMeta(m structs.QueryMetaCompat, token string) {
func (s *Server) setQueryMeta(m blockingQueryResponseMeta, token string) {
if s.IsLeader() {
m.SetLastContact(0)
m.SetKnownLeader(true)
Expand Down Expand Up @@ -1085,7 +1133,7 @@ func (s *Server) rpcQueryTimeout(queryTimeout time.Duration) time.Duration {
// will only check whether it is blank or not). It's a safe assumption because
// ResultsFilteredByACLs is only set to try when applying the already-resolved
// token's policies.
func maskResultsFilteredByACLs(token string, meta structs.QueryMetaCompat) {
func maskResultsFilteredByACLs(token string, meta blockingQueryResponseMeta) {
if token == "" {
meta.SetResultsFilteredByACLs(false)
}
Expand Down

0 comments on commit 09d61e6

Please sign in to comment.