-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
Copy pathl1_beacon_client.go
327 lines (284 loc) · 10.3 KB
/
l1_beacon_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
package sources
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
)
const (
versionMethod = "eth/v1/node/version"
specMethod = "eth/v1/config/spec"
genesisMethod = "eth/v1/beacon/genesis"
sidecarsMethodPrefix = "eth/v1/beacon/blob_sidecars/"
)
type L1BeaconClientConfig struct {
FetchAllSidecars bool
}
// L1BeaconClient is a high level golang client for the Beacon API.
type L1BeaconClient struct {
cl BeaconClient
pool *ClientPool[BlobSideCarsFetcher]
cfg L1BeaconClientConfig
initLock sync.Mutex
timeToSlotFn TimeToSlotFn
}
// BeaconClient is a thin wrapper over the Beacon APIs.
//
//go:generate mockery --name BeaconClient --with-expecter=true
type BeaconClient interface {
NodeVersion(ctx context.Context) (string, error)
ConfigSpec(ctx context.Context) (eth.APIConfigResponse, error)
BeaconGenesis(ctx context.Context) (eth.APIGenesisResponse, error)
BeaconBlobSideCars(ctx context.Context, fetchAllSidecars bool, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error)
}
// BlobSideCarsFetcher is a thin wrapper over the Beacon APIs.
//
//go:generate mockery --name BlobSideCarsFetcher --with-expecter=true
type BlobSideCarsFetcher interface {
BeaconBlobSideCars(ctx context.Context, fetchAllSidecars bool, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error)
}
// BeaconHTTPClient implements BeaconClient. It provides golang types over the basic Beacon API.
type BeaconHTTPClient struct {
cl client.HTTP
}
func NewBeaconHTTPClient(cl client.HTTP) *BeaconHTTPClient {
return &BeaconHTTPClient{cl}
}
func (cl *BeaconHTTPClient) apiReq(ctx context.Context, dest any, reqPath string, reqQuery url.Values) error {
headers := http.Header{}
headers.Add("Accept", "application/json")
resp, err := cl.cl.Get(ctx, reqPath, reqQuery, headers)
if err != nil {
return fmt.Errorf("http Get failed: %w", err)
}
if resp.StatusCode == http.StatusNotFound {
errMsg, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return fmt.Errorf("failed request with status %d: %s: %w", resp.StatusCode, string(errMsg), ethereum.NotFound)
} else if resp.StatusCode != http.StatusOK {
errMsg, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return fmt.Errorf("failed request with status %d: %s", resp.StatusCode, string(errMsg))
}
if err := json.NewDecoder(resp.Body).Decode(dest); err != nil {
_ = resp.Body.Close()
return err
}
if err := resp.Body.Close(); err != nil {
return fmt.Errorf("failed to close response body: %w", err)
}
return nil
}
func (cl *BeaconHTTPClient) NodeVersion(ctx context.Context) (string, error) {
var resp eth.APIVersionResponse
if err := cl.apiReq(ctx, &resp, versionMethod, nil); err != nil {
return "", err
}
return resp.Data.Version, nil
}
func (cl *BeaconHTTPClient) ConfigSpec(ctx context.Context) (eth.APIConfigResponse, error) {
var configResp eth.APIConfigResponse
if err := cl.apiReq(ctx, &configResp, specMethod, nil); err != nil {
return eth.APIConfigResponse{}, err
}
return configResp, nil
}
func (cl *BeaconHTTPClient) BeaconGenesis(ctx context.Context) (eth.APIGenesisResponse, error) {
var genesisResp eth.APIGenesisResponse
if err := cl.apiReq(ctx, &genesisResp, genesisMethod, nil); err != nil {
return eth.APIGenesisResponse{}, err
}
return genesisResp, nil
}
func (cl *BeaconHTTPClient) BeaconBlobSideCars(ctx context.Context, fetchAllSidecars bool, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error) {
reqPath := path.Join(sidecarsMethodPrefix, strconv.FormatUint(slot, 10))
var reqQuery url.Values
if !fetchAllSidecars {
reqQuery = url.Values{}
for i := range hashes {
reqQuery.Add("indices", strconv.FormatUint(hashes[i].Index, 10))
}
}
var resp eth.APIGetBlobSidecarsResponse
if err := cl.apiReq(ctx, &resp, reqPath, reqQuery); err != nil {
return eth.APIGetBlobSidecarsResponse{}, err
}
indices := make(map[uint64]struct{}, len(hashes))
for _, h := range hashes {
indices[h.Index] = struct{}{}
}
for _, apisc := range resp.Data {
delete(indices, uint64(apisc.Index))
}
if len(indices) > 0 {
return eth.APIGetBlobSidecarsResponse{}, fmt.Errorf("#returned blobs(%d) != #requested blobs(%d)", len(hashes)-len(indices), len(hashes))
}
return resp, nil
}
type ClientPool[T any] struct {
clients []T
index int
}
func NewClientPool[T any](clients ...T) *ClientPool[T] {
return &ClientPool[T]{
clients: clients,
index: 0,
}
}
func (p *ClientPool[T]) Len() int {
return len(p.clients)
}
func (p *ClientPool[T]) Get() T {
return p.clients[p.index]
}
func (p *ClientPool[T]) MoveToNext() {
p.index += 1
if p.index == len(p.clients) {
p.index = 0
}
}
// NewL1BeaconClient returns a client for making requests to an L1 consensus layer node.
// Fallbacks are optional clients that will be used for fetching blobs. L1BeaconClient will rotate between
// the `cl` and the fallbacks whenever a client runs into an error while fetching blobs.
func NewL1BeaconClient(cl BeaconClient, cfg L1BeaconClientConfig, fallbacks ...BlobSideCarsFetcher) *L1BeaconClient {
cs := append([]BlobSideCarsFetcher{cl}, fallbacks...)
return &L1BeaconClient{
cl: cl,
pool: NewClientPool(cs...),
cfg: cfg,
}
}
type TimeToSlotFn func(timestamp uint64) (uint64, error)
// GetTimeToSlotFn returns a function that converts a timestamp to a slot number.
func (cl *L1BeaconClient) GetTimeToSlotFn(ctx context.Context) (TimeToSlotFn, error) {
cl.initLock.Lock()
defer cl.initLock.Unlock()
if cl.timeToSlotFn != nil {
return cl.timeToSlotFn, nil
}
genesis, err := cl.cl.BeaconGenesis(ctx)
if err != nil {
return nil, err
}
config, err := cl.cl.ConfigSpec(ctx)
if err != nil {
return nil, err
}
genesisTime := uint64(genesis.Data.GenesisTime)
secondsPerSlot := uint64(config.Data.SecondsPerSlot)
if secondsPerSlot == 0 {
return nil, fmt.Errorf("got bad value for seconds per slot: %v", config.Data.SecondsPerSlot)
}
cl.timeToSlotFn = func(timestamp uint64) (uint64, error) {
if timestamp < genesisTime {
return 0, fmt.Errorf("provided timestamp (%v) precedes genesis time (%v)", timestamp, genesisTime)
}
return (timestamp - genesisTime) / secondsPerSlot, nil
}
return cl.timeToSlotFn, nil
}
func (cl *L1BeaconClient) fetchSidecars(ctx context.Context, slot uint64, hashes []eth.IndexedBlobHash) (eth.APIGetBlobSidecarsResponse, error) {
var errs []error
for i := 0; i < cl.pool.Len(); i++ {
f := cl.pool.Get()
resp, err := f.BeaconBlobSideCars(ctx, cl.cfg.FetchAllSidecars, slot, hashes)
if err != nil {
cl.pool.MoveToNext()
errs = append(errs, err)
} else {
return resp, nil
}
}
return eth.APIGetBlobSidecarsResponse{}, errors.Join(errs...)
}
// GetBlobSidecars fetches blob sidecars that were confirmed in the specified
// L1 block with the given indexed hashes.
// Order of the returned sidecars is guaranteed to be that of the hashes.
// Blob data is not checked for validity.
func (cl *L1BeaconClient) GetBlobSidecars(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.BlobSidecar, error) {
if len(hashes) == 0 {
return []*eth.BlobSidecar{}, nil
}
slotFn, err := cl.GetTimeToSlotFn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get time to slot function: %w", err)
}
slot, err := slotFn(ref.Time)
if err != nil {
return nil, fmt.Errorf("error in converting ref.Time to slot: %w", err)
}
resp, err := cl.fetchSidecars(ctx, slot, hashes)
if err != nil {
return nil, fmt.Errorf("failed to fetch blob sidecars for slot %v block %v: %w", slot, ref, err)
}
apiscs := make([]*eth.APIBlobSidecar, 0, len(hashes))
// filter and order by hashes
for _, h := range hashes {
for _, apisc := range resp.Data {
if h.Index == uint64(apisc.Index) {
apiscs = append(apiscs, apisc)
break
}
}
}
if len(hashes) != len(apiscs) {
return nil, fmt.Errorf("expected %v sidecars but got %v", len(hashes), len(apiscs))
}
bscs := make([]*eth.BlobSidecar, 0, len(hashes))
for _, apisc := range apiscs {
bscs = append(bscs, apisc.BlobSidecar())
}
return bscs, nil
}
// GetBlobs fetches blobs that were confirmed in the specified L1 block with the given indexed
// hashes. The order of the returned blobs will match the order of `hashes`. Confirms each
// blob's validity by checking its proof against the commitment, and confirming the commitment
// hashes to the expected value. Returns error if any blob is found invalid.
func (cl *L1BeaconClient) GetBlobs(ctx context.Context, ref eth.L1BlockRef, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
blobSidecars, err := cl.GetBlobSidecars(ctx, ref, hashes)
if err != nil {
return nil, fmt.Errorf("failed to get blob sidecars for L1BlockRef %s: %w", ref, err)
}
blobs, err := blobsFromSidecars(blobSidecars, hashes)
if err != nil {
return nil, fmt.Errorf("failed to get blobs from sidecars for L1BlockRef %s: %w", ref, err)
}
return blobs, nil
}
func blobsFromSidecars(blobSidecars []*eth.BlobSidecar, hashes []eth.IndexedBlobHash) ([]*eth.Blob, error) {
if len(blobSidecars) != len(hashes) {
return nil, fmt.Errorf("number of hashes and blobSidecars mismatch, %d != %d", len(hashes), len(blobSidecars))
}
out := make([]*eth.Blob, len(hashes))
for i, ih := range hashes {
sidecar := blobSidecars[i]
if sidx := uint64(sidecar.Index); sidx != ih.Index {
return nil, fmt.Errorf("expected sidecars to be ordered by hashes, but got %d != %d", sidx, ih.Index)
}
// make sure the blob's kzg commitment hashes to the expected value
hash := eth.KZGToVersionedHash(kzg4844.Commitment(sidecar.KZGCommitment))
if hash != ih.Hash {
return nil, fmt.Errorf("expected hash %s for blob at index %d but got %s", ih.Hash, ih.Index, hash)
}
// confirm blob data is valid by verifying its proof against the commitment
if err := eth.VerifyBlobProof(&sidecar.Blob, kzg4844.Commitment(sidecar.KZGCommitment), kzg4844.Proof(sidecar.KZGProof)); err != nil {
return nil, fmt.Errorf("blob at index %d failed verification: %w", i, err)
}
out[i] = &sidecar.Blob
}
return out, nil
}
// GetVersion fetches the version of the Beacon-node.
func (cl *L1BeaconClient) GetVersion(ctx context.Context) (string, error) {
return cl.cl.NodeVersion(ctx)
}