-
Notifications
You must be signed in to change notification settings - Fork 958
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(edssser): introduce EDS Store Stresser and cel-shed utility (#2482)
- Loading branch information
Showing
4 changed files
with
351 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
_ "expvar" | ||
"fmt" | ||
"math" | ||
"net/http" | ||
"os" | ||
"time" | ||
|
||
logging "github.com/ipfs/go-log/v2" | ||
"github.com/mitchellh/go-homedir" | ||
"github.com/pyroscope-io/client/pyroscope" | ||
"github.com/spf13/cobra" | ||
|
||
"github.com/celestiaorg/celestia-node/libs/edssser" | ||
"github.com/celestiaorg/celestia-node/nodebuilder" | ||
"github.com/celestiaorg/celestia-node/nodebuilder/node" | ||
) | ||
|
||
const ( | ||
edsStorePathFlag = "path" | ||
edsWritesFlag = "writes" | ||
edsSizeFlag = "size" | ||
edsDisableLogFlag = "disable-log" | ||
edsLogStatFreqFlag = "log-stat-freq" | ||
edsCleanupFlag = "cleanup" | ||
edsFreshStartFlag = "fresh" | ||
|
||
pyroscopeEndpointFlag = "pyroscope" | ||
putTimeoutFlag = "timeout" | ||
badgerLogLevelFlag = "badger-log-level" | ||
) | ||
|
||
func init() { | ||
edsStoreCmd.AddCommand(edsStoreStress) | ||
|
||
defaultPath := "~/.edssser" | ||
path, err := homedir.Expand(defaultPath) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
pathFlagUsage := fmt.Sprintf("Directory path to use for stress test. Uses %s by default.", defaultPath) | ||
edsStoreStress.Flags().String(edsStorePathFlag, path, pathFlagUsage) | ||
edsStoreStress.Flags().String(pyroscopeEndpointFlag, "", | ||
"Pyroscope address. If no address provided, pyroscope will be disabled") | ||
edsStoreStress.Flags().Int(edsWritesFlag, math.MaxInt, "Total EDS writes to make. MaxInt by default.") | ||
edsStoreStress.Flags().Int(edsSizeFlag, 128, "Chooses EDS size. 128 by default.") | ||
edsStoreStress.Flags().Bool(edsDisableLogFlag, false, "Disables logging. Enabled by default.") | ||
edsStoreStress.Flags().Int(edsLogStatFreqFlag, 10, "Write statistic logging frequency. 10 by default.") | ||
edsStoreStress.Flags().Bool(edsCleanupFlag, false, "Cleans up the store on stop. Disabled by default.") | ||
edsStoreStress.Flags().Bool(edsFreshStartFlag, false, "Cleanup previous state on start. Disabled by default.") | ||
edsStoreStress.Flags().Int(putTimeoutFlag, 30, "Sets put timeout in seconds. 30 sec by default.") | ||
edsStoreStress.Flags().String(badgerLogLevelFlag, "INFO", "Badger log level, Defaults to INFO") | ||
|
||
// kill redundant print | ||
nodebuilder.PrintKeyringInfo = false | ||
} | ||
|
||
var edsStoreCmd = &cobra.Command{ | ||
Use: "eds-store [subcommand]", | ||
Short: "Collection of eds-store related utilities", | ||
} | ||
|
||
var edsStoreStress = &cobra.Command{ | ||
Use: "stress", | ||
Short: `Runs eds.Store stress test over default node.Store Datastore backend (e.g. Badger).`, | ||
SilenceUsage: true, | ||
RunE: func(cmd *cobra.Command, args []string) (err error) { | ||
// expose expvar vars over http | ||
go http.ListenAndServe(":9999", http.DefaultServeMux) //nolint:errcheck,gosec | ||
|
||
endpoint, _ := cmd.Flags().GetString(pyroscopeEndpointFlag) | ||
if endpoint != "" { | ||
_, err = pyroscope.Start(pyroscope.Config{ | ||
ApplicationName: "cel-shred.stresser", | ||
ServerAddress: endpoint, | ||
ProfileTypes: []pyroscope.ProfileType{ | ||
pyroscope.ProfileCPU, | ||
pyroscope.ProfileAllocObjects, | ||
pyroscope.ProfileAllocSpace, | ||
pyroscope.ProfileInuseObjects, | ||
pyroscope.ProfileInuseSpace, | ||
}, | ||
}) | ||
if err != nil { | ||
fmt.Printf("failed to launch pyroscope with addr: %s err: %s\n", endpoint, err.Error()) | ||
} else { | ||
fmt.Println("connected pyroscope to:", endpoint) | ||
} | ||
} | ||
|
||
path, _ := cmd.Flags().GetString(edsStorePathFlag) | ||
fmt.Printf("using %s\n", path) | ||
|
||
freshStart, _ := cmd.Flags().GetBool(edsFreshStartFlag) | ||
if freshStart { | ||
err = os.RemoveAll(path) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
cleanup, _ := cmd.Flags().GetBool(edsCleanupFlag) | ||
if cleanup { | ||
defer func() { | ||
err = errors.Join(err, os.RemoveAll(path)) | ||
}() | ||
} | ||
|
||
loglevel, _ := cmd.Flags().GetString(badgerLogLevelFlag) | ||
if err = logging.SetLogLevel("badger", loglevel); err != nil { | ||
return err | ||
} | ||
|
||
disableLog, _ := cmd.Flags().GetBool(edsDisableLogFlag) | ||
logFreq, _ := cmd.Flags().GetInt(edsLogStatFreqFlag) | ||
edsWrites, _ := cmd.Flags().GetInt(edsWritesFlag) | ||
edsSize, _ := cmd.Flags().GetInt(edsSizeFlag) | ||
putTimeout, _ := cmd.Flags().GetInt(putTimeoutFlag) | ||
|
||
cfg := edssser.Config{ | ||
EDSSize: edsSize, | ||
EDSWrites: edsWrites, | ||
EnableLog: !disableLog, | ||
LogFilePath: path, | ||
StatLogFreq: logFreq, | ||
OpTimeout: time.Duration(putTimeout) * time.Second, | ||
} | ||
|
||
err = nodebuilder.Init(*nodebuilder.DefaultConfig(node.Full), path, node.Full) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
nodestore, err := nodebuilder.OpenStore(path, nil) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { | ||
err = errors.Join(err, nodestore.Close()) | ||
}() | ||
|
||
datastore, err := nodestore.Datastore() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
stresser, err := edssser.NewEDSsser(path, datastore, cfg) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
stats, err := stresser.Run(cmd.Context()) | ||
if !errors.Is(err, context.Canceled) { | ||
return err | ||
} | ||
|
||
fmt.Printf("%s", stats.Finalize()) | ||
return nil | ||
}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
package edssser | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"os" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ipfs/go-datastore" | ||
|
||
"github.com/celestiaorg/celestia-app/pkg/da" | ||
|
||
"github.com/celestiaorg/celestia-node/share/eds" | ||
"github.com/celestiaorg/celestia-node/share/eds/edstest" | ||
) | ||
|
||
type Config struct { | ||
EDSSize int | ||
EDSWrites int | ||
EnableLog bool | ||
LogFilePath string | ||
StatLogFreq int | ||
OpTimeout time.Duration | ||
} | ||
|
||
// EDSsser stand for EDS Store Stresser. | ||
type EDSsser struct { | ||
config Config | ||
datastore datastore.Batching | ||
edsstoreMu sync.Mutex | ||
edsstore *eds.Store | ||
|
||
statsFileMu sync.Mutex | ||
statsFile *os.File | ||
} | ||
|
||
func NewEDSsser(path string, datastore datastore.Batching, cfg Config) (*EDSsser, error) { | ||
edsstore, err := eds.NewStore(path, datastore) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &EDSsser{ | ||
config: cfg, | ||
datastore: datastore, | ||
edsstore: edsstore, | ||
}, nil | ||
} | ||
|
||
func (ss *EDSsser) Run(ctx context.Context) (stats Stats, err error) { | ||
ss.edsstoreMu.Lock() | ||
defer ss.edsstoreMu.Unlock() | ||
|
||
err = ss.edsstore.Start(ctx) | ||
if err != nil { | ||
return stats, err | ||
} | ||
defer func() { | ||
err = errors.Join(err, ss.edsstore.Stop(ctx)) | ||
}() | ||
|
||
edsHashes, err := ss.edsstore.List() | ||
if err != nil { | ||
return stats, err | ||
} | ||
fmt.Printf("recovered %d EDSes\n\n", len(edsHashes)) | ||
|
||
t := &testing.T{} | ||
for toWrite := ss.config.EDSWrites - len(edsHashes); ctx.Err() == nil && toWrite > 0; toWrite-- { | ||
took, err := ss.put(ctx, t) | ||
|
||
stats.TotalWritten++ | ||
stats.TotalTime += took | ||
if took < stats.MinTime || stats.MinTime == 0 { | ||
stats.MinTime = took | ||
} else if took > stats.MaxTime { | ||
stats.MaxTime = took | ||
} | ||
|
||
if ss.config.EnableLog { | ||
if stats.TotalWritten%ss.config.StatLogFreq == 0 { | ||
stats := stats.Finalize() | ||
fmt.Println(stats) | ||
go func() { | ||
err := ss.dumpStat(stats) | ||
if err != nil { | ||
fmt.Printf("error dumping stats: %s\n", err.Error()) | ||
} | ||
}() | ||
} | ||
if err != nil { | ||
fmt.Printf("ERROR put: %s, took: %v, at: %v\n", err.Error(), took, time.Now()) | ||
continue | ||
} | ||
if took > ss.config.OpTimeout/2 { | ||
fmt.Println("long put", "size", ss.config.EDSSize, "took", took, "at", time.Now()) | ||
continue | ||
} | ||
|
||
fmt.Println("square written", "size", ss.config.EDSSize, "took", took, "at", time.Now()) | ||
} | ||
} | ||
return stats, nil | ||
} | ||
|
||
func (ss *EDSsser) dumpStat(stats Stats) (err error) { | ||
ss.statsFileMu.Lock() | ||
defer ss.statsFileMu.Unlock() | ||
|
||
ss.statsFile, err = os.Create(ss.config.LogFilePath + "/edssser_stats.txt") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
_, err = ss.statsFile.Write([]byte(stats.String())) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return ss.statsFile.Close() | ||
} | ||
|
||
type Stats struct { | ||
TotalWritten int | ||
TotalTime, MinTime, MaxTime, AvgTime time.Duration | ||
// Deviation ? | ||
} | ||
|
||
func (stats Stats) Finalize() Stats { | ||
if stats.TotalTime != 0 { | ||
stats.AvgTime = stats.TotalTime / time.Duration(stats.TotalWritten) | ||
} | ||
return stats | ||
} | ||
|
||
func (stats Stats) String() string { | ||
return fmt.Sprintf(` | ||
TotalWritten %d | ||
TotalWritingTime %v | ||
MaxTime %s | ||
MinTime %s | ||
AvgTime %s | ||
`, | ||
stats.TotalWritten, | ||
stats.TotalTime, | ||
stats.MaxTime, | ||
stats.MinTime, | ||
stats.AvgTime, | ||
) | ||
} | ||
|
||
func (ss *EDSsser) put(ctx context.Context, t *testing.T) (time.Duration, error) { | ||
ctx, cancel := context.WithTimeout(ctx, ss.config.OpTimeout) | ||
if ss.config.OpTimeout == 0 { | ||
ctx, cancel = context.WithCancel(ctx) | ||
} | ||
defer cancel() | ||
|
||
// divide by 2 to get ODS size as expected by RandEDS | ||
square := edstest.RandEDS(t, ss.config.EDSSize/2) | ||
dah, err := da.NewDataAvailabilityHeader(square) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
now := time.Now() | ||
err = ss.edsstore.Put(ctx, dah.Hash(), square) | ||
return time.Since(now), err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters