Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - v2alpha1 atxs stream #5566

Closed
wants to merge 29 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2381693
iterate over database
dshulyak Nov 7, 2023
bab8ac6
add half baked streaming for atxs
dshulyak Nov 7, 2023
a617a05
refactor atxs db api
dshulyak Nov 9, 2023
4e9610d
save progress
dshulyak Nov 9, 2023
a767855
register v2
dshulyak Nov 9, 2023
d30636f
support for offset and limit
dshulyak Nov 9, 2023
7ff7f67
encode the rest of the fields
dshulyak Nov 9, 2023
e609d66
Merge branch 'develop' into atxs-stream
dshulyak Nov 9, 2023
f4abc11
allow to watch
dshulyak Nov 9, 2023
0c43282
track todo
dshulyak Nov 9, 2023
2da3634
fix where
dshulyak Nov 9, 2023
82fd29d
refactor offset / limit
dshulyak Nov 9, 2023
af95f33
debug
dshulyak Nov 9, 2023
8958021
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 8, 2024
13a10f9
Use string builder to build query
kacpersaw Feb 9, 2024
7edbb42
add matcher to stream func
kacpersaw Feb 9, 2024
e07f5fb
Move to v2alpha1 and remove headers
kacpersaw Feb 12, 2024
cea030a
Add tests
kacpersaw Feb 13, 2024
c20d397
Add ActivationsCount handler
kacpersaw Feb 13, 2024
7a6a2ea
Extract query builder into separate pkg
kacpersaw Feb 13, 2024
f90fc70
lint fix
kacpersaw Feb 13, 2024
4f77497
Move service name to const
kacpersaw Feb 13, 2024
033872e
Add unit tests & bump api version
kacpersaw Feb 15, 2024
9cef9fa
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
f533c6d
Apply suggestions from code review
kacpersaw Feb 16, 2024
fe17373
Apply review suggestions
kacpersaw Feb 16, 2024
7635482
lint
kacpersaw Feb 16, 2024
1f80fa9
apply review suggestion
kacpersaw Feb 16, 2024
d87b0e8
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply suggestions from code review
Co-authored-by: Bartosz Różański <[email protected]>
kacpersaw and poszu authored Feb 16, 2024
commit f533c6d5cdb8fd36633c645146b6b1c7117069eb
17 changes: 9 additions & 8 deletions api/grpcserver/v2alpha1/activation.go
Original file line number Diff line number Diff line change
@@ -41,12 +41,12 @@
spacemeshv2alpha1.RegisterActivationStreamServiceServer(server, s)
}

func (s *ActivationStreamService) RegisterHandlerService(mux *runtime.ServeMux) error {
return spacemeshv2alpha1.RegisterActivationStreamServiceHandlerServer(context.Background(), mux, s)

Check warning on line 45 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}

func (s *ActivationStreamService) String() string {
return "ActivationStreamService"

Check warning on line 49 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

func (s *ActivationStreamService) Stream(
@@ -59,23 +59,23 @@
var err error
sub, err = events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())

Check warning on line 62 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L62

Added line #L62 was not covered by tests
}
defer sub.Close()
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")

Check warning on line 66 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L66

Added line #L66 was not covered by tests
}
}
ops, err := toOperations(toRequest(request))
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())

Check warning on line 71 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L71

Added line #L71 was not covered by tests
}
var ierr error
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.VerifiedActivationTx) bool {
ierr = stream.Send(&spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return ierr == nil
}); err != nil {
return status.Error(codes.Internal, err.Error())

Check warning on line 78 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L78

Added line #L78 was not covered by tests
}
if sub == nil {
return nil
@@ -84,17 +84,17 @@
select {
case <-stream.Context().Done():
return nil
case <-sub.Full():
return status.Error(codes.Canceled, "buffer overflow")

Check warning on line 88 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L87-L88

Added lines #L87 - L88 were not covered by tests
case rst := <-sub.Out():
if err := stream.Send(&spacemeshv2alpha1.Activation{
err := stream.Send(&spacemeshv2alpha1.Activation{
Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(rst.VerifiedActivationTx)},
},
); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
})
switch {
case errors.Is(err, io.EOF):
return nil
case err != nil:
return status.Error(codes.Internal, err.Error())

Check warning on line 97 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L94-L97

Added lines #L94 - L97 were not covered by tests
}
}
}
@@ -115,40 +115,40 @@
Ticks: uint32(atx.TickCount()),
}
if atx.CommitmentATX != nil {
v1.CommittmentAtx = atx.CommitmentATX.Bytes()

Check warning on line 118 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L118

Added line #L118 was not covered by tests
}
if atx.VRFNonce != nil {
v1.VrfPostIndex = &spacemeshv2alpha1.VRFPostIndex{
Nonce: uint64(*atx.VRFNonce),

Check warning on line 122 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}
}
if atx.InitialPost != nil {
v1.InitialPost = &spacemeshv2alpha1.Post{
Nonce: atx.InitialPost.Nonce,
Indices: atx.InitialPost.Indices,
Pow: atx.InitialPost.Pow,

Check warning on line 129 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L126-L129

Added lines #L126 - L129 were not covered by tests
}
}
if nipost := atx.NIPost; nipost != nil {
if nipost.Post != nil {
v1.Post = &spacemeshv2alpha1.Post{
Nonce: nipost.Post.Nonce,
Indices: nipost.Post.Indices,
Pow: nipost.Post.Pow,

Check warning on line 137 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L133-L137

Added lines #L133 - L137 were not covered by tests
}
}
if nipost.PostMetadata != nil {
v1.PostMeta = &spacemeshv2alpha1.PostMeta{
Challenge: nipost.PostMetadata.Challenge,
LabelsPerUnit: nipost.PostMetadata.LabelsPerUnit,

Check warning on line 143 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L140-L143

Added lines #L140 - L143 were not covered by tests
}
}
v1.Membership = &spacemeshv2alpha1.PoetMembershipProof{
ProofNodes: make([][]byte, len(nipost.Membership.Nodes)),
Leaf: nipost.Membership.LeafIndex,

Check warning on line 148 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L146-L148

Added lines #L146 - L148 were not covered by tests
}
for i, node := range nipost.Membership.Nodes {
v1.Membership.ProofNodes[i] = node.Bytes()

Check warning on line 151 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L150-L151

Added lines #L150 - L151 were not covered by tests
}
}
return v1
@@ -173,8 +173,8 @@
}

// String returns the service name.
func (s *ActivationService) String() string {
return "ActivationService"

Check warning on line 177 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L176-L177

Added lines #L176 - L177 were not covered by tests
}

func (s *ActivationService) List(
@@ -183,12 +183,13 @@
) (*spacemeshv2alpha1.ActivationList, error) {
ops, err := toOperations(request)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())

Check warning on line 186 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L186

Added line #L186 was not covered by tests
}
// every full atx is ~1KB. 100 atxs is ~100KB.
if request.Limit > 100 {
switch {
case request.Limit > 100:
return nil, status.Error(codes.InvalidArgument, "limit is capped at 100")
} else if request.Limit == 0 {
case request.Limit == 0:
return nil, status.Error(codes.InvalidArgument, "limit must be set to <= 100")
}
rst := make([]*spacemeshv2alpha1.Activation, 0, request.Limit)
@@ -196,7 +197,7 @@
rst = append(rst, &spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return true
}); err != nil {
return nil, status.Error(codes.Internal, err.Error())

Check warning on line 200 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L200

Added line #L200 was not covered by tests
}
return &spacemeshv2alpha1.ActivationList{Activations: rst}, nil
}
@@ -215,7 +216,7 @@

count, err := atxs.CountAtxsByEpoch(s.db, ops)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())

Check warning on line 219 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L219

Added line #L219 was not covered by tests
}

return &spacemeshv2alpha1.ActivationsCountResponse{Count: count}, nil
@@ -234,7 +235,7 @@
func toOperations(filter *spacemeshv2alpha1.ActivationRequest) (builder.Operations, error) {
ops := builder.Operations{}
if filter == nil {
return ops, nil

Check warning on line 238 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L238

Added line #L238 was not covered by tests
}
if filter.NodeId != nil {
ops.Filter = append(ops.Filter, builder.Op{
@@ -253,7 +254,7 @@
if len(filter.Coinbase) > 0 {
addr, err := types.StringToAddress(filter.Coinbase)
if err != nil {
return builder.Operations{}, err

Check warning on line 257 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L257

Added line #L257 was not covered by tests
}
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Coinbase,
@@ -269,11 +270,11 @@
})
}
if filter.EndEpoch != 0 {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Epoch,
Token: builder.Lte,
Value: int64(filter.EndEpoch),
})

Check warning on line 277 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L273-L277

Added lines #L273 - L277 were not covered by tests
}

ops.Other = append(ops.Other, builder.Op{
@@ -324,8 +325,8 @@
if len(m.Coinbase) > 0 {
addr, err := types.StringToAddress(m.Coinbase)
if err != nil {
ctxzap.Error(m.ctx, "unable to convert atx coinbase", zap.Error(err))
return false

Check warning on line 329 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L328-L329

Added lines #L328 - L329 were not covered by tests
}
if t.Coinbase != addr {
return false
@@ -334,13 +335,13 @@

if m.StartEpoch != 0 {
if t.PublishEpoch.Uint32() < m.StartEpoch {
return false

Check warning on line 338 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L338

Added line #L338 was not covered by tests
}
}

if m.EndEpoch != 0 {
if t.PublishEpoch.Uint32() > m.EndEpoch {
return false

Check warning on line 344 in api/grpcserver/v2alpha1/activation.go

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L343-L344

Added lines #L343 - L344 were not covered by tests
}
}

15 changes: 6 additions & 9 deletions api/grpcserver/v2alpha1/activation_test.go
Original file line number Diff line number Diff line change
@@ -28,14 +28,11 @@ func TestActivationService_List(t *testing.T) {

gen := fixture.NewAtxsGenerator()
activations := make([]types.VerifiedActivationTx, 100)
require.NoError(t, db.WithTx(ctx, func(dtx *sql.Tx) error {
for i := range activations {
atx := gen.Next()
require.NoError(t, atxs.Add(dtx, atx))
activations[i] = *atx
}
return nil
}))
for i := range activations {
atx := gen.Next()
require.NoError(t, atxs.Add(db, atx))
activations[i] = *atx
}

svc := NewActivationService(db)
cfg, cleanup := launchServer(t, svc)
@@ -70,7 +67,7 @@ func TestActivationService_List(t *testing.T) {
Offset: 50,
})
require.NoError(t, err)
require.Equal(t, 25, len(list.Activations))
require.Len(t, list.Activations, 25)
})

t.Run("all", func(t *testing.T) {
7 changes: 3 additions & 4 deletions sql/builder/builder.go
Original file line number Diff line number Diff line change
@@ -51,10 +51,9 @@

for i, op := range operations.Filter {
if i == 0 {
queryBuilder.WriteString(" " + string(Where))
}
if i != 0 {
queryBuilder.WriteString(" " + string(And))
queryBuilder.WriteString(" where")
} else {
queryBuilder.WriteString(" and")
}
queryBuilder.WriteString(" " + string(op.Field) + " " + string(op.Token) + " ?" + strconv.Itoa(i+1))
}
@@ -74,8 +73,8 @@
stmt.BindInt64(i+1, value)
case []byte:
stmt.BindBytes(i+1, value)
default:
panic(fmt.Sprintf("unexpected type %T", value))

Check warning on line 77 in sql/builder/builder.go

Codecov / codecov/patch

sql/builder/builder.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
}
}

Unchanged files with check annotations Beta

signer, err := signing.NewEdSigner()
if err != nil {
log.Println("failed to create signer:", err)
os.Exit(1)

Check warning on line 59 in common/fixture/atxs.go

Codecov / codecov/patch

common/fixture/atxs.go#L58-L59

Added lines #L58 - L59 were not covered by tests
}
atx = types.VerifiedActivationTx{
if atx != nil {
return fn(atx)
}
derr = err
return derr == nil

Check warning on line 523 in sql/atxs/atxs.go

Codecov / codecov/patch

sql/atxs/atxs.go#L522-L523

Added lines #L522 - L523 were not covered by tests
}))
if err != nil {
return err

Check warning on line 526 in sql/atxs/atxs.go

Codecov / codecov/patch

sql/atxs/atxs.go#L526

Added line #L526 was not covered by tests
}
return derr
}