-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathoracle.go
136 lines (124 loc) · 5.01 KB
/
oracle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package app
import (
"context"
"time"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/server/types"
oraclepreblock "github.com/skip-mev/connect/v2/abci/preblock/oracle"
"github.com/skip-mev/connect/v2/abci/proposals"
"github.com/skip-mev/connect/v2/abci/strategies/aggregator"
compression "github.com/skip-mev/connect/v2/abci/strategies/codec"
"github.com/skip-mev/connect/v2/abci/strategies/currencypair"
"github.com/skip-mev/connect/v2/abci/ve"
oracleconfig "github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/pkg/math/voteweighted"
oracleclient "github.com/skip-mev/connect/v2/service/clients/oracle"
servicemetrics "github.com/skip-mev/connect/v2/service/metrics"
)
// initializeOracle initializes the oracle client and metrics.
func (app *App) initializeOracle(appOpts types.AppOptions) (oracleclient.OracleClient, servicemetrics.Metrics, error) {
// Read general config from app-opts, and construct oracle service.
cfg, err := oracleconfig.ReadConfigFromAppOpts(appOpts)
if err != nil {
return nil, nil, err
}
// If app level instrumentation is enabled, then wrap the oracle service with a metrics client
// to get metrics on the oracle service (for ABCI++). This will allow the instrumentation to track
// latency in VerifyVoteExtension requests and more.
oracleMetrics, err := servicemetrics.NewMetricsFromConfig(cfg, app.ChainID())
if err != nil {
return nil, nil, err
}
// Create the oracle service.
oracleClient, err := oracleclient.NewPriceDaemonClientFromConfig(
cfg,
app.Logger().With("client", "oracle"),
oracleMetrics,
)
if err != nil {
return nil, nil, err
}
// Connect to the oracle service (default timeout of 5 seconds).
go func() {
app.Logger().Info("attempting to start oracle client...", "address", cfg.OracleAddress)
if err := oracleClient.Start(context.Background()); err != nil {
app.Logger().Error("failed to start oracle client", "err", err)
panic(err)
}
}()
return oracleClient, oracleMetrics, nil
}
func (app *App) initializeABCIExtensions(oracleClient oracleclient.OracleClient, oracleMetrics servicemetrics.Metrics) {
// Create the proposal handler that will be used to fill proposals with
// transactions and oracle data.
proposalHandler := proposals.NewProposalHandler(
app.Logger(),
baseapp.NoOpPrepareProposal(),
baseapp.NoOpProcessProposal(),
ve.NewDefaultValidateVoteExtensionsFn(app.StakingKeeper),
compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
),
compression.NewCompressionExtendedCommitCodec(
compression.NewDefaultExtendedCommitCodec(),
compression.NewZStdCompressor(),
),
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper),
oracleMetrics,
)
app.SetPrepareProposal(proposalHandler.PrepareProposalHandler())
app.SetProcessProposal(proposalHandler.ProcessProposalHandler())
// Create the aggregation function that will be used to aggregate oracle data
// from each validator.
aggregatorFn := voteweighted.MedianFromContext(
app.Logger(),
app.StakingKeeper,
voteweighted.DefaultPowerThreshold,
)
veCodec := compression.NewCompressionVoteExtensionCodec(
compression.NewDefaultVoteExtensionCodec(),
compression.NewZLibCompressor(),
)
ecCodec := compression.NewCompressionExtendedCommitCodec(
compression.NewDefaultExtendedCommitCodec(),
compression.NewZStdCompressor(),
)
// Create the pre-finalize block hook that will be used to apply oracle data
// to the state before any transactions are executed (in finalize block).
oraclePreBlockHandler := oraclepreblock.NewOraclePreBlockHandler(
app.Logger(),
aggregatorFn,
app.OracleKeeper,
oracleMetrics,
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
veCodec,
ecCodec,
)
app.SetPreBlocker(oraclePreBlockHandler.WrappedPreBlocker(app.ModuleManager))
// Create the vote extensions handler that will be used to extend and verify
// vote extensions (i.e. oracle data).
voteExtensionsHandler := ve.NewVoteExtensionHandler(
app.Logger(),
oracleClient,
time.Second, // timeout
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
veCodec,
aggregator.NewOraclePriceApplier(
aggregator.NewDefaultVoteAggregator(
app.Logger(),
aggregatorFn,
// we need a separate price strategy here, so that we can optimistically apply the latest prices
// and extend our vote based on these prices
currencypair.NewDeltaCurrencyPairStrategy(app.OracleKeeper), // IMPORTANT: always construct new currency pair strategy objects when functions require them as arguments.
),
app.OracleKeeper,
veCodec,
ecCodec,
app.Logger(),
),
oracleMetrics,
)
app.SetExtendVoteHandler(voteExtensionsHandler.ExtendVoteHandler())
app.SetVerifyVoteExtensionHandler(voteExtensionsHandler.VerifyVoteExtensionHandler())
}