Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: enables local tracing in e2e benchmark tests #3514

Merged
merged 20 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions test/e2e/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/celestiaorg/celestia-app/v2/test/e2e/testnet"
"github.com/tendermint/tendermint/pkg/trace"
)

type BenchmarkTest struct {
Expand Down Expand Up @@ -54,6 +55,7 @@ func (b *BenchmarkTest) SetupNodes() error {
err = b.CreateTxClients(b.manifest.TxClientVersion,
b.manifest.BlobSequences,
b.manifest.BlobSizes,
b.manifest.BlobsPerSeq,
b.manifest.TxClientsResource, gRPCEndpoints)
testnet.NoError("failed to create tx clients", err)

Expand All @@ -63,7 +65,32 @@ func (b *BenchmarkTest) SetupNodes() error {
testnet.WithTimeoutPropose(b.manifest.TimeoutPropose),
testnet.WithTimeoutCommit(b.manifest.TimeoutCommit),
testnet.WithPrometheus(b.manifest.Prometheus),
testnet.WithLocalTracing(b.manifest.LocalTracingType),
))

if b.manifest.PushTrace {
log.Println("reading trace push config")
if pushConfig, err := trace.GetPushConfigFromEnv(); err == nil {
log.Print("Setting up trace push config")
for _, node := range b.Nodes() {
if err = node.Instance.SetEnvironmentVariable(trace.PushBucketName, pushConfig.BucketName); err != nil {
return fmt.Errorf("failed to set TRACE_PUSH_BUCKET_NAME: %v", err)
}
if err = node.Instance.SetEnvironmentVariable(trace.PushRegion, pushConfig.Region); err != nil {
return fmt.Errorf("failed to set TRACE_PUSH_REGION: %v", err)
}
if err = node.Instance.SetEnvironmentVariable(trace.PushAccessKey, pushConfig.AccessKey); err != nil {
return fmt.Errorf("failed to set TRACE_PUSH_ACCESS_KEY: %v", err)
}
if err = node.Instance.SetEnvironmentVariable(trace.PushKey, pushConfig.SecretKey); err != nil {
return fmt.Errorf("failed to set TRACE_PUSH_SECRET_KEY: %v", err)
}
if err = node.Instance.SetEnvironmentVariable(trace.PushDelay, fmt.Sprintf("%d", pushConfig.PushDelay)); err != nil {
return fmt.Errorf("failed to set TRACE_PUSH_DELAY: %v", err)
}
}
}
}
return nil
}

Expand Down
8 changes: 7 additions & 1 deletion test/e2e/benchmark/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ type Manifest struct {
// consensus parameters
MaxBlockBytes int64

// other configs
// LocalTracingType can be "local" or "noop"
LocalTracingType string
PushTrace bool
// download traces from the s3 bucket
// only available when PushTrace is enabled
DownloadTraces bool

UpgradeHeight int64
GovMaxSquareSize int64
}
Expand Down
24 changes: 24 additions & 0 deletions test/e2e/benchmark/throughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/celestiaorg/celestia-app/v2/pkg/appconsts"
"github.com/celestiaorg/celestia-app/v2/test/e2e/testnet"
"github.com/celestiaorg/celestia-app/v2/test/util/testnode"
"github.com/tendermint/tendermint/pkg/trace"
)

const (
Expand Down Expand Up @@ -49,6 +50,9 @@ func E2EThroughput() error {
Prometheus: true,
GovMaxSquareSize: appconsts.DefaultGovMaxSquareSize,
MaxBlockBytes: appconsts.DefaultMaxBytes,
LocalTracingType: "local",
PushTrace: false,
Copy link
Collaborator Author

@staheri14 staheri14 May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[To reviewers] If you wish to push traced data to a s3 bucket, set PushTrace to true.

DownloadTraces: false,
TestDuration: 30 * time.Second,
TxClients: 2,
}
Expand All @@ -66,6 +70,26 @@ func E2EThroughput() error {
testnet.NoError("failed to run the benchmark test", benchTest.Run())

// post test data collection and validation

// if local tracing is enabled,
// pull round state traces to confirm tracing is working as expected.
if benchTest.manifest.LocalTracingType == "local" {
if _, err := benchTest.Node(0).PullRoundStateTraces("."); err != nil {
return fmt.Errorf("failed to pull round state traces: %w", err)
}
}

// download traces from S3, if enabled
if benchTest.manifest.PushTrace && benchTest.manifest.DownloadTraces {
// download traces from S3
pushConfig, _ := trace.GetPushConfigFromEnv()
err := trace.S3Download("./traces/", benchTest.manifest.ChainID,
pushConfig)
if err != nil {
return fmt.Errorf("failed to download traces from S3: %w", err)
}
}

log.Println("Reading blockchain")
blockchain, err := testnode.ReadBlockchain(context.Background(),
benchTest.Node(0).AddressRPC())
Expand Down
44 changes: 42 additions & 2 deletions test/e2e/testnet/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/pkg/trace/schema"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/rpc/client/http"
"github.com/tendermint/tendermint/types"
Expand All @@ -24,6 +26,7 @@ const (
p2pPort = 26656
grpcPort = 9090
prometheusPort = 26660
tracingPort = 26661
dockerSrcURL = "ghcr.io/celestiaorg/celestia-app"
secp256k1Type = "secp256k1"
ed25519Type = "ed25519"
Expand All @@ -41,8 +44,23 @@ type Node struct {
SelfDelegation int64
Instance *knuu.Instance

rpcProxyPort int
grpcProxyPort int
rpcProxyPort int
grpcProxyPort int
traceProxyPort int
}

// PullRoundStateTraces retrieves the round state traces from a node.
// It will save them to the provided path.
func (n *Node) PullRoundStateTraces(path string) ([]trace.Event[schema.RoundState], error,
) {
addr := n.AddressTracing()
log.Info().Str("Address", addr).Msg("Pulling round state traces")

err := trace.GetTable(addr, schema.RoundState{}.Table(), path)
if err != nil {
return nil, fmt.Errorf("getting table: %w", err)
}
return nil, nil
}

// Resources defines the resource requirements for a Node.
Expand Down Expand Up @@ -84,6 +102,10 @@ func NewNode(
if err := instance.AddPortTCP(grpcPort); err != nil {
return nil, err
}
if err := instance.AddPortTCP(tracingPort); err != nil {
return nil, err
}

if grafana != nil {
// add support for metrics
if err := instance.SetPrometheusEndpoint(prometheusPort, fmt.Sprintf("knuu-%s", knuu.Scope()), "1m"); err != nil {
Expand Down Expand Up @@ -251,6 +273,18 @@ func (n Node) RemoteAddressRPC() (string, error) {
return fmt.Sprintf("%s:%d", ip, rpcPort), nil
}

func (n Node) AddressTracing() string {
return fmt.Sprintf("http://127.0.0.1:%d", n.traceProxyPort)
}

func (n Node) RemoteAddressTracing() (string, error) {
ip, err := n.Instance.GetIP()
if err != nil {
return "", err
}
return fmt.Sprintf("http://%s:26661", ip), nil
}

func (n Node) IsValidator() bool {
return n.SelfDelegation != 0
}
Expand Down Expand Up @@ -306,8 +340,14 @@ func (n *Node) forwardPorts() error {
return fmt.Errorf("forwarding port %d: %w", grpcPort, err)
}

traceProxyPort, err := n.Instance.PortForwardTCP(tracingPort)
if err != nil {
return fmt.Errorf("forwarding port %d: %w", tracingPort, err)
}

n.rpcProxyPort = rpcProxyPort
n.grpcProxyPort = grpcProxyPort
n.traceProxyPort = traceProxyPort

return nil
}
Expand Down
8 changes: 8 additions & 0 deletions test/e2e/testnet/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func WithBroadcastTxs(broadcast bool) Option {
}
}

func WithLocalTracing(localTracingType string) Option {
return func(cfg *config.Config) {
cfg.Instrumentation.TraceType = localTracingType
cfg.Instrumentation.TraceBufferSize = 1000
cfg.Instrumentation.TracePullAddress = ":26661"
}
}

func WriteAddressBook(peers []string, file string) error {
book := pex.NewAddrBook(file, false)
for _, peer := range peers {
Expand Down
14 changes: 10 additions & 4 deletions test/e2e/testnet/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func (t *Testnet) SetConsensusMaxBlockSize(size int64) {
func (t *Testnet) CreateGenesisNode(version string, selfDelegation, upgradeHeight int64, resources Resources) error {
signerKey := t.keygen.Generate(ed25519Type)
networkKey := t.keygen.Generate(ed25519Type)
node, err := NewNode(fmt.Sprintf("val%d", len(t.nodes)), version, 0, selfDelegation, nil, signerKey, networkKey, upgradeHeight, resources, t.grafana)
node, err := NewNode(fmt.Sprintf("val%d", len(t.nodes)), version, 0,
selfDelegation, nil, signerKey, networkKey, upgradeHeight, resources,
t.grafana)
if err != nil {
return err
}
Expand All @@ -92,13 +94,14 @@ func (t *Testnet) CreateGenesisNodes(num int, version string, selfDelegation, up
func (t *Testnet) CreateTxClients(version string,
sequences int,
blobRange string,
blobPerSequence int,
resources Resources,
grpcEndpoints []string,
) error {
for i, grpcEndpoint := range grpcEndpoints {
name := fmt.Sprintf("txsim%d", i)
err := t.CreateTxClient(name, version, sequences,
blobRange, resources, grpcEndpoint)
blobRange, blobPerSequence, resources, grpcEndpoint)
if err != nil {
log.Err(err).Str("name", name).
Str("grpc endpoint", grpcEndpoint).
Expand Down Expand Up @@ -127,6 +130,7 @@ func (t *Testnet) CreateTxClient(name,
version string,
sequences int,
blobRange string,
blobPerSequence int,
resources Resources,
grpcEndpoint string,
) error {
Expand All @@ -144,7 +148,7 @@ func (t *Testnet) CreateTxClient(name,

// Create a txsim node using the key stored in the txsimKeyringDir
txsim, err := CreateTxClient(name, version, grpcEndpoint, t.seed,
sequences, blobRange, 1, resources, txsimRootDir)
sequences, blobRange, blobPerSequence, 1, resources, txsimRootDir)
if err != nil {
log.Err(err).
Str("name", name).
Expand Down Expand Up @@ -235,7 +239,9 @@ func (t *Testnet) CreateAccount(name string, tokens int64, txsimKeyringDir strin
func (t *Testnet) CreateNode(version string, startHeight, upgradeHeight int64, resources Resources) error {
signerKey := t.keygen.Generate(ed25519Type)
networkKey := t.keygen.Generate(ed25519Type)
node, err := NewNode(fmt.Sprintf("val%d", len(t.nodes)), version, startHeight, 0, nil, signerKey, networkKey, upgradeHeight, resources, t.grafana)
node, err := NewNode(fmt.Sprintf("val%d", len(t.nodes)), version,
startHeight, 0, nil, signerKey, networkKey, upgradeHeight, resources,
t.grafana)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/testnet/txsimNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func CreateTxClient(
seed int64,
sequences int,
blobRange string,
blobsPerSeq int,
pollTime int,
resources Resources,
volumePath string,
Expand Down Expand Up @@ -65,7 +66,7 @@ func CreateTxClient(
fmt.Sprintf("-t %ds", pollTime),
fmt.Sprintf("-b %d ", sequences),
fmt.Sprintf("-d %d ", seed),
fmt.Sprintf("-a %d ", 5),
fmt.Sprintf("-a %d ", blobsPerSeq),
fmt.Sprintf("-s %s ", blobRange),
}

Expand Down
5 changes: 5 additions & 0 deletions test/e2e/testnet/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"os"

"github.com/rs/zerolog/log"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/secp256k1"
Expand Down Expand Up @@ -44,12 +45,16 @@ type GrafanaInfo struct {
}

func GetGrafanaInfoFromEnvVar() *GrafanaInfo {
log.Info().Msg("Checking Grafana environment variables")
if os.Getenv("GRAFANA_ENDPOINT") == "" ||
os.Getenv("GRAFANA_USERNAME") == "" ||
os.Getenv("GRAFANA_TOKEN") == "" {

log.Info().Msg("No Grafana environment variables found")
return nil
}

log.Info().Msg("Grafana environment variables found")
return &GrafanaInfo{
Endpoint: os.Getenv("GRAFANA_ENDPOINT"),
Username: os.Getenv("GRAFANA_USERNAME"),
Expand Down
Loading