Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e regression tests: HTTP caching proxy, new cases #633

Merged
merged 13 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ docker/node/etc/genesis.json
tests/e2e/testnet/net-runner

# Local workdirs
cache
.*
*.log
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ tests/e2e_regression/*/actual/*

# Log output.
**/*.log
/cache
/logs

# API files.
Expand Down
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,15 @@ accept-e2e-regression-suite:
ifndef SUITE
$(error SUITE is undefined)
endif
@# Delete all old expected files first, in case any test cases were renamed or removed.
rm -rf ./tests/e2e_regression/$(SUITE)/expected
@# Copy the actual outputs to the expected outputs.
cp -r ./tests/e2e_regression/$(SUITE)/{actual,expected}
@# The result of the "spec" test is a special case. It should always match the
@# current openAPI spec file, so we symlink it to avoid having to update the expected
@# output every time the spec changes.
rm ./tests/e2e_regression/$(SUITE)/expected/spec.body
[[ -d ./tests/e2e_regression/$(SUITE)/actual ]] || { echo "Note: No actual outputs found for suite $(SUITE). Nothing to accept."; exit 0; } \
# Delete all old expected files first, in case any test cases were renamed or removed. \
rm -rf ./tests/e2e_regression/$(SUITE)/expected; \
# Copy the actual outputs to the expected outputs. \
cp -r ./tests/e2e_regression/$(SUITE)/{actual,expected}; \
# The result of the "spec" test is a special case. It should always match the \
# current openAPI spec file, so we symlink it to avoid having to update the expected \
# output every time the spec changes. \
rm -f ./tests/e2e_regression/$(SUITE)/expected/spec.body; \
ln -s ../../../../api/spec/v1.yaml ./tests/e2e_regression/$(SUITE)/expected/spec.body

# Format code.
Expand Down
41 changes: 27 additions & 14 deletions analyzer/evmverifier/evmverifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -44,20 +45,28 @@ func NewAnalyzer(
logger.Warn("EVM contracts verifier only supports testnet/mainnet, stopping", "chain_name", chain)
return nil, fmt.Errorf("invalid chainName %s, expected one of testnet/mainnet", chain)
}
// Default interval is 5 minutes.
if cfg.Interval == 0 {
cfg.Interval = 5 * time.Minute
}
if cfg.Interval < time.Minute {
return nil, fmt.Errorf("invalid interval %s, evm contracts verifier interval must be at least 1 minute to meet Sourcify rate limits", cfg.Interval.String())
}
// interItemDelay should be at least 1 second to meet Sourcify rate limits.
if cfg.InterItemDelay == 0 {
cfg.InterItemDelay = time.Second
}
if cfg.InterItemDelay < time.Second {
return nil, fmt.Errorf("invalid interItemDelay %s, evm contracts verifier inter item delay must be at least 1 second to meet sourcify rate limits", cfg.InterItemDelay.String())

// Assuming we're running against "real" Sourcify, impose conservative request rates so we don't get banned.
// If we're running against localhost, assume it's backed by a local cache and proceed quickly.
// NOTE: We might hit Sourcify at very high rates if the local cache is empty, and the default intervals (0) are used.
// In my experiment with 26 contracts, that was not a problem - Sourcify did not have time to ban me.
if !(strings.Contains(sourcifyServerUrl, "localhost") || strings.Contains(sourcifyServerUrl, "127.0.0.1")) {
// Default interval is 5 minutes.
if cfg.Interval == 0 {
cfg.Interval = 5 * time.Minute
}
if cfg.Interval < time.Minute {
return nil, fmt.Errorf("invalid interval %s, evm contracts verifier interval must be at least 1 minute to meet Sourcify rate limits", cfg.Interval.String())
}
// interItemDelay should be at least 1 second to meet Sourcify rate limits.
if cfg.InterItemDelay == 0 {
cfg.InterItemDelay = time.Second
}
if cfg.InterItemDelay < time.Second {
return nil, fmt.Errorf("invalid interItemDelay %s, evm contracts verifier inter item delay must be at least 1 second to meet sourcify rate limits", cfg.InterItemDelay.String())
}
}

client, err := sourcify.NewClient(sourcifyServerUrl, chain, logger)
if err != nil {
return nil, err
Expand Down Expand Up @@ -194,7 +203,11 @@ func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch,
}

batch.Queue(
queries.RuntimeEVMVerifyContractUpdate,
// NOTE: This also updates `verification_info_downloaded_at`, causing the `evm_abi` to re-parse
// the contract's txs and events.
// NOTE: We upsert rather than update; if the contract is not in the db yet, UPDATE would ignore the
// contract and this analyzer would keep retrying it on every iteration.
Comment on lines +208 to +209
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm is there a case where the verifier would upsert rather than update? The GetItems query fetches candidates from the chain.evm_contracts table originally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An insert can happen if sourcify has a contract that we haven't encountered yet, e.g. because we're scanning just a subset of blocks. GetItems no longer limits the items to chain.evm_contracts addresses as of #634 (which is not merged; I accidentally merged it into this branch and then undid that, but I'll duplicate the PR to merge into main once this PR goes in). The reasons for the change are pretty lightweight: we now don't have to worry about what happens as the number of contracts grows, and I found the new flow easier to understand, especially with the added partial/full verification.

queries.RuntimeEVMVerifyContractUpsert,
p.runtime,
item.Addr,
abi.Output.ABI,
Expand Down
78 changes: 41 additions & 37 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ const (
var ErrEmptyBatch = errors.New("no items in batch")

type itemBasedAnalyzer[Item any] struct {
maxBatchSize uint64
stopOnEmptyQueue bool
fixedInterval time.Duration
interItemDelay time.Duration
analyzerName string
maxBatchSize uint64
stopIfQueueEmptyFor time.Duration
fixedInterval time.Duration
interItemDelay time.Duration
analyzerName string

processor ItemProcessor[Item]

Expand All @@ -61,8 +61,8 @@ type ItemProcessor[Item any] interface {

// NewAnalyzer returns a new item based analyzer using the provided item processor.
//
// If stopOnEmptyQueue is true, the analyzer will process batches of items until its
// work queue is empty, at which point it will terminate and return. Likely to
// If stopIfQueueEmptyFor is a non-zero duration, the analyzer will process batches of items until its
// work queue is empty for `stopIfQueueEmptyFor`, at which point it will terminate and return. Likely to
// be used in the regression tests.
//
// If fixedInterval is provided, the analyzer will process one batch every fixedInterval.
Expand All @@ -79,15 +79,15 @@ func NewAnalyzer[Item any](
cfg.BatchSize = defaultBatchSize
}
a := &itemBasedAnalyzer[Item]{
cfg.BatchSize,
cfg.StopOnEmptyQueue,
cfg.Interval,
cfg.InterItemDelay,
name,
processor,
target,
logger,
metrics.NewDefaultAnalysisMetrics(name),
maxBatchSize: cfg.BatchSize,
stopIfQueueEmptyFor: cfg.StopIfQueueEmptyFor,
fixedInterval: cfg.Interval,
interItemDelay: cfg.InterItemDelay,
analyzerName: name,
processor: processor,
target: target,
logger: logger,
metrics: metrics.NewDefaultAnalysisMetrics(name),
Andrew7234 marked this conversation as resolved.
Show resolved Hide resolved
}

return a, nil
Expand Down Expand Up @@ -195,41 +195,45 @@ func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) {
)
return
}
mostRecentTask := time.Now()

for {
delay := backoff.Timeout()
if a.fixedInterval != 0 {
delay = a.fixedInterval
}
select {
case <-time.After(delay):
// Process another batch of items.
case <-ctx.Done():
a.logger.Warn("shutting down item analyzer", "reason", ctx.Err())
return
}
// Update queueLength
queueLength, err := a.sendQueueLengthMetric(ctx)
if err == nil && queueLength == 0 && a.stopOnEmptyQueue {
a.logger.Warn("item analyzer work queue is empty; shutting down")
// Stop if queue has been empty for a while, and configured to do so.
if err == nil && queueLength == 0 && a.stopIfQueueEmptyFor != 0 && time.Since(mostRecentTask) > a.stopIfQueueEmptyFor {
a.logger.Warn("item analyzer work queue has been empty for a while; shutting down",
"queue_empty_since", mostRecentTask,
"queue_empty_for", time.Since(mostRecentTask),
"stop_if_queue_empty_for", a.stopIfQueueEmptyFor)
return
}
a.logger.Info("work queue length", "num_items", queueLength)

numProcessed, err := a.processBatch(ctx)
if err != nil {
if err != nil { //nolint:gocritic
a.logger.Error("error processing batch", "err", err)
backoff.Failure()
continue
}
if numProcessed == 0 {
// Count this as a failure to reduce the polling when we are
// running faster than the block analyzer can find new tokens.
} else if numProcessed == 0 {
// We are running faster than work is being created. Reduce needless GetItems() calls.
backoff.Failure()
continue
} else {
mostRecentTask = time.Now()
backoff.Success()
}

backoff.Success()
// Sleep a little before the next batch.
delay := backoff.Timeout()
if a.fixedInterval != 0 {
delay = a.fixedInterval
}
select {
case <-time.After(delay):
// Process another batch of items.
case <-ctx.Done():
a.logger.Warn("shutting down item analyzer", "reason", ctx.Err())
return
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions analyzer/item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ const (

// Default item based config.
var testItemBasedConfig = &config.ItemBasedAnalyzerConfig{
BatchSize: 3,
StopOnEmptyQueue: true,
Interval: 0, // use backoff
InterItemDelay: 0,
BatchSize: 3,
StopIfQueueEmptyFor: time.Second,
Interval: 0, // use backoff
InterItemDelay: 0,
}

type mockItem struct {
Expand Down
22 changes: 15 additions & 7 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,15 +925,23 @@ var (
WHERE
runtime = $1 AND verification_info_downloaded_at IS NULL`

RuntimeEVMVerifyContractUpdate = `
UPDATE chain.evm_contracts
RuntimeEVMVerifiedContracts = `
SELECT
contracts.contract_address,
contracts.verification_level
FROM chain.evm_contracts AS contracts
WHERE
runtime = $1 AND verification_level IS NOT NULL`

RuntimeEVMVerifyContractUpsert = `
INSERT INTO chain.evm_contracts (runtime, contract_address, verification_info_downloaded_at, abi, compilation_metadata, source_files)
VALUES ($1, $2, CURRENT_TIMESTAMP, $3, $4, $5)
ON CONFLICT (runtime, contract_address) DO UPDATE
SET
verification_info_downloaded_at = CURRENT_TIMESTAMP,
abi = $3,
compilation_metadata = $4,
source_files = $5
WHERE
runtime = $1 AND contract_address = $2`
abi = EXCLUDED.abi,
compilation_metadata = EXCLUDED.compilation_metadata,
source_files = EXCLUDED.source_files`

RuntimeEvmVerifiedContractTxs = `
WITH abi_contracts AS (
Expand Down
106 changes: 106 additions & 0 deletions cache/httpproxy/caching_http_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package httpproxy

import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"path/filepath"
"regexp"
"time"

"github.com/oasisprotocol/nexus/cache/kvstore"
cmdCommon "github.com/oasisprotocol/nexus/cmd/common"
"github.com/oasisprotocol/nexus/config"
)

// cachingHttpProxy is a minimal HTTP handler that caches responses; it is inteded *for testing only*
// and comes with several caveats:
// - The cache is never invalidated.
// - All requests are forwarded to the same target host.
// - Requests are cached based on the full URL, NOT the headers or the method (GET vs POST) or the request body.
// - Low-likelihood errors (e.g. malformed URLs) result in panics.
type cachingHttpProxy struct {
client *http.Client
cache kvstore.KVStore
targetHost string // Must include the protocol (e.g. "http://")
}

var _ http.Handler = (*cachingHttpProxy)(nil)

type CacheableResponse struct {
StatusCode int
Header http.Header
Body []byte
}

func (p cachingHttpProxy) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Rewrite the request URL to new host
targetURL, _ := url.JoinPath(p.targetHost, req.URL.RequestURI())
req, err := http.NewRequestWithContext(req.Context(), req.Method, targetURL, req.Body)
if err != nil {
panic(fmt.Sprintf("error rewriting request to %s %s: %v", req.Method, req.URL.String(), err))
}

// Get the response to the modified request (via cache)
cResp, err := kvstore.GetFromCacheOrCall(p.cache, false, kvstore.CacheKey(req.URL.RequestURI()), func() (*CacheableResponse, error) {
resp, err2 := p.client.Do(req)
if err2 != nil {
return nil, err2
}
defer resp.Body.Close()
body, err2 := io.ReadAll(resp.Body)
if err2 != nil {
return nil, fmt.Errorf("error reading response body from %s: %v", resp.Request.URL.String(), err2)
}
return &CacheableResponse{
StatusCode: resp.StatusCode,
Header: resp.Header,
Body: body,
}, nil
})
if err != nil {
cResp = &CacheableResponse{
StatusCode: http.StatusInternalServerError,
Header: http.Header{},
Body: []byte(fmt.Sprintf("error proxying request: %v", err)),
}
}

// Write out the response to `w`.
for key, values := range cResp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
w.WriteHeader(cResp.StatusCode) // 200, 404, etc
if _, err = io.Copy(w, bytes.NewReader(cResp.Body)); err != nil {
panic(err)
}
}

// Creates a http.Server that proxies all requests to the target URL.
// The server caches all responses in a persisted key-value store, located in an
// autogenerated subdirectory of the cache root dir.
func NewHttpServer(cacheCfg config.CacheConfig, proxyCfg config.HttpCachingProxyConfig) (*http.Server, error) {
// Derive the cache root dir from the target URL.
cleanTargetUrl := regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(proxyCfg.TargetURL, "_")
cachePath := filepath.Join(cacheCfg.CacheDir, cleanTargetUrl)
kvStore, err := kvstore.OpenKVStore(cmdCommon.RootLogger().WithModule("caching-http-proxy"), cachePath, nil)
if err != nil {
return nil, err
}

handler := cachingHttpProxy{
client: &http.Client{},
cache: kvStore,
targetHost: proxyCfg.TargetURL,
}

return &http.Server{
Addr: proxyCfg.HostAddr,
Handler: handler,
ReadHeaderTimeout: time.Second,
}, nil
}
Loading
Loading