Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - v2alpha1 atxs stream #5566

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2381693
iterate over database
dshulyak Nov 7, 2023
bab8ac6
add half baked streaming for atxs
dshulyak Nov 7, 2023
a617a05
refactor atxs db api
dshulyak Nov 9, 2023
4e9610d
save progress
dshulyak Nov 9, 2023
a767855
register v2
dshulyak Nov 9, 2023
d30636f
support for offset and limit
dshulyak Nov 9, 2023
7ff7f67
encode the rest of the fields
dshulyak Nov 9, 2023
e609d66
Merge branch 'develop' into atxs-stream
dshulyak Nov 9, 2023
f4abc11
allow to watch
dshulyak Nov 9, 2023
0c43282
track todo
dshulyak Nov 9, 2023
2da3634
fix where
dshulyak Nov 9, 2023
82fd29d
refactor offset / limit
dshulyak Nov 9, 2023
af95f33
debug
dshulyak Nov 9, 2023
8958021
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 8, 2024
13a10f9
Use string builder to build query
kacpersaw Feb 9, 2024
7edbb42
add matcher to stream func
kacpersaw Feb 9, 2024
e07f5fb
Move to v2alpha1 and remove headers
kacpersaw Feb 12, 2024
cea030a
Add tests
kacpersaw Feb 13, 2024
c20d397
Add ActivationsCount handler
kacpersaw Feb 13, 2024
7a6a2ea
Extract query builder into separate pkg
kacpersaw Feb 13, 2024
f90fc70
lint fix
kacpersaw Feb 13, 2024
4f77497
Move service name to const
kacpersaw Feb 13, 2024
033872e
Add unit tests & bump api version
kacpersaw Feb 15, 2024
9cef9fa
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
f533c6d
Apply suggestions from code review
kacpersaw Feb 16, 2024
fe17373
Apply review suggestions
kacpersaw Feb 16, 2024
7635482
lint
kacpersaw Feb 16, 2024
1f80fa9
apply review suggestion
kacpersaw Feb 16, 2024
d87b0e8
Merge branch 'develop' into v2-alpha-atxs-stream
kacpersaw Feb 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions api/grpcserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ type Config struct {
type Service = string

const (
Admin Service = "admin"
Debug Service = "debug"
GlobalState Service = "global"
Mesh Service = "mesh"
Transaction Service = "transaction"
Activation Service = "activation"
Smesher Service = "smesher"
Post Service = "post"
Node Service = "node"
Admin Service = "admin"
Debug Service = "debug"
GlobalState Service = "global"
Mesh Service = "mesh"
Transaction Service = "transaction"
Activation Service = "activation"
Smesher Service = "smesher"
Post Service = "post"
Node Service = "node"
ActivationV2Alpha1 Service = "activation_v2alpha1"
ActivationStreamV2Alpha1 Service = "activation_stream_v2alpha1"
)

// DefaultConfig defines the default configuration options for api.
func DefaultConfig() Config {
return Config{
PublicServices: []Service{GlobalState, Mesh, Transaction, Node, Activation},
PublicServices: []Service{GlobalState, Mesh, Transaction, Node, Activation, ActivationV2Alpha1},
PublicListener: "0.0.0.0:9092",
PrivateServices: []Service{Admin, Smesher, Debug},
PrivateServices: []Service{Admin, Smesher, Debug, ActivationStreamV2Alpha1},
poszu marked this conversation as resolved.
Show resolved Hide resolved
PrivateListener: "127.0.0.1:9093",
PostServices: []Service{Post},
PostListener: "127.0.0.1:9094",
Expand Down
357 changes: 357 additions & 0 deletions api/grpcserver/v2alpha1/activation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
package v2alpha1

import (
"context"
"errors"
"fmt"
"io"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
spacemeshv2alpha1 "github.com/spacemeshos/api/release/go/spacemesh/v2alpha1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/events"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
"github.com/spacemeshos/go-spacemesh/sql/builder"
)

const (
Activation = "activation_v2alpha1"
ActivationStream = "activation_stream_v2alpha1"
)

func NewActivationStreamService(db sql.Executor) *ActivationStreamService {
return &ActivationStreamService{db: db}
}

type ActivationStreamService struct {
db sql.Executor
}

func (s *ActivationStreamService) RegisterService(server *grpc.Server) {
spacemeshv2alpha1.RegisterActivationStreamServiceServer(server, s)
}

func (s *ActivationStreamService) RegisterHandlerService(mux *runtime.ServeMux) error {
return spacemeshv2alpha1.RegisterActivationStreamServiceHandlerServer(context.Background(), mux, s)

Check warning on line 43 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L42-L43

Added lines #L42 - L43 were not covered by tests
}

func (s *ActivationStreamService) String() string {
return "ActivationStreamService"

Check warning on line 47 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L46-L47

Added lines #L46 - L47 were not covered by tests
}

func (s *ActivationStreamService) Stream(
request *spacemeshv2alpha1.ActivationStreamRequest,
stream spacemeshv2alpha1.ActivationStreamService_StreamServer,
) error {
var sub *events.BufferedSubscription[events.ActivationTx]
if request.Watch {
matcher := resultsMatcher{request, stream.Context()}
var err error
sub, err = events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())

Check warning on line 60 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L60

Added line #L60 was not covered by tests
}
defer sub.Close()
if err := stream.SendHeader(metadata.MD{}); err != nil {
return status.Errorf(codes.Unavailable, "can't send header")

Check warning on line 64 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L64

Added line #L64 was not covered by tests
}
}
ops, err := toOperations(toRequest(request))
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())

Check warning on line 69 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L69

Added line #L69 was not covered by tests
}
var ierr error
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.VerifiedActivationTx) bool {
ierr = stream.Send(&spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return ierr == nil
}); err != nil {
return status.Error(codes.Internal, err.Error())

Check warning on line 76 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L76

Added line #L76 was not covered by tests
}
if sub == nil {
return nil
}
poszu marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-stream.Context().Done():
return nil
case <-sub.Full():
return status.Error(codes.Canceled, "buffer overflow")

Check warning on line 86 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L85-L86

Added lines #L85 - L86 were not covered by tests
case rst := <-sub.Out():
err := stream.Send(&spacemeshv2alpha1.Activation{
Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(rst.VerifiedActivationTx)},
})
switch {
case errors.Is(err, io.EOF):
return nil
case err != nil:
return status.Error(codes.Internal, err.Error())

Check warning on line 95 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L92-L95

Added lines #L92 - L95 were not covered by tests
}
}
}
}

func toAtx(atx *types.VerifiedActivationTx) *spacemeshv2alpha1.ActivationV1 {
v1 := &spacemeshv2alpha1.ActivationV1{
Id: atx.ID().Bytes(),
NodeId: atx.SmesherID.Bytes(),
Signature: atx.Signature.Bytes(),
PublishEpoch: atx.PublishEpoch.Uint32(),
Sequence: atx.Sequence,
PreviousAtx: atx.PrevATXID[:],
PositioningAtx: atx.PositioningATX[:],
Coinbase: atx.Coinbase.String(),
Units: atx.NumUnits,
BaseHeight: uint32(atx.BaseTickHeight()),
Ticks: uint32(atx.TickCount()),
}
if atx.CommitmentATX != nil {
v1.CommittmentAtx = atx.CommitmentATX.Bytes()

Check warning on line 116 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L116

Added line #L116 was not covered by tests
}
if atx.VRFNonce != nil {
v1.VrfPostIndex = &spacemeshv2alpha1.VRFPostIndex{
Nonce: uint64(*atx.VRFNonce),

Check warning on line 120 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L119-L120

Added lines #L119 - L120 were not covered by tests
}
}
if atx.InitialPost != nil {
v1.InitialPost = &spacemeshv2alpha1.Post{
Nonce: atx.InitialPost.Nonce,
Indices: atx.InitialPost.Indices,
Pow: atx.InitialPost.Pow,

Check warning on line 127 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L124-L127

Added lines #L124 - L127 were not covered by tests
}
}

if atx.NIPost == nil {
panic(fmt.Sprintf("nil nipost for atx %s", atx.ShortString()))

Check warning on line 132 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L132

Added line #L132 was not covered by tests
}

if atx.NIPost.Post == nil {
panic(fmt.Sprintf("nil nipost post for atx %s", atx.ShortString()))

Check warning on line 136 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L136

Added line #L136 was not covered by tests
}

if atx.NIPost.PostMetadata == nil {
panic(fmt.Sprintf("nil nipost post metadata for atx %s", atx.ShortString()))

Check warning on line 140 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L140

Added line #L140 was not covered by tests
}

nipost := atx.NIPost
v1.Post = &spacemeshv2alpha1.Post{
Nonce: nipost.Post.Nonce,
Indices: nipost.Post.Indices,
Pow: nipost.Post.Pow,
}

v1.PostMeta = &spacemeshv2alpha1.PostMeta{
Challenge: nipost.PostMetadata.Challenge,
LabelsPerUnit: nipost.PostMetadata.LabelsPerUnit,
}

v1.Membership = &spacemeshv2alpha1.PoetMembershipProof{
ProofNodes: make([][]byte, len(nipost.Membership.Nodes)),
Leaf: nipost.Membership.LeafIndex,
}

for i, node := range nipost.Membership.Nodes {
v1.Membership.ProofNodes[i] = node.Bytes()

Check warning on line 161 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L161

Added line #L161 was not covered by tests
}

return v1
}

func NewActivationService(db sql.Executor) *ActivationService {
return &ActivationService{db: db}
}

type ActivationService struct {
db sql.Executor
}

func (s *ActivationService) RegisterService(server *grpc.Server) {
spacemeshv2alpha1.RegisterActivationServiceServer(server, s)
}

func (s *ActivationService) RegisterHandlerService(mux *runtime.ServeMux) error {
return spacemeshv2alpha1.RegisterActivationServiceHandlerServer(context.Background(), mux, s)
}

// String returns the service name.
func (s *ActivationService) String() string {
return "ActivationService"

Check warning on line 185 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L184-L185

Added lines #L184 - L185 were not covered by tests
}

func (s *ActivationService) List(
ctx context.Context,
request *spacemeshv2alpha1.ActivationRequest,
) (*spacemeshv2alpha1.ActivationList, error) {
ops, err := toOperations(request)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())

Check warning on line 194 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L194

Added line #L194 was not covered by tests
}
// every full atx is ~1KB. 100 atxs is ~100KB.
switch {
case request.Limit > 100:
return nil, status.Error(codes.InvalidArgument, "limit is capped at 100")
case request.Limit == 0:
return nil, status.Error(codes.InvalidArgument, "limit must be set to <= 100")
}
rst := make([]*spacemeshv2alpha1.Activation, 0, request.Limit)
if err := atxs.IterateAtxsOps(s.db, ops, func(atx *types.VerifiedActivationTx) bool {
rst = append(rst, &spacemeshv2alpha1.Activation{Versioned: &spacemeshv2alpha1.Activation_V1{V1: toAtx(atx)}})
return true
}); err != nil {
return nil, status.Error(codes.Internal, err.Error())

Check warning on line 208 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L208

Added line #L208 was not covered by tests
}
return &spacemeshv2alpha1.ActivationList{Activations: rst}, nil
}

func (s *ActivationService) ActivationsCount(
ctx context.Context,
request *spacemeshv2alpha1.ActivationsCountRequest,
) (*spacemeshv2alpha1.ActivationsCountResponse, error) {
ops := builder.Operations{Filter: []builder.Op{
{
Field: builder.Epoch,
Token: builder.Eq,
Value: int64(request.Epoch),
},
}}

count, err := atxs.CountAtxsByOps(s.db, ops)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())

Check warning on line 227 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L227

Added line #L227 was not covered by tests
}

return &spacemeshv2alpha1.ActivationsCountResponse{Count: count}, nil
}

func toRequest(filter *spacemeshv2alpha1.ActivationStreamRequest) *spacemeshv2alpha1.ActivationRequest {
return &spacemeshv2alpha1.ActivationRequest{
NodeId: filter.NodeId,
Id: filter.Id,
Coinbase: filter.Coinbase,
StartEpoch: filter.StartEpoch,
EndEpoch: filter.EndEpoch,
}
}

func toOperations(filter *spacemeshv2alpha1.ActivationRequest) (builder.Operations, error) {
ops := builder.Operations{}
if filter == nil {
return ops, nil

Check warning on line 246 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L246

Added line #L246 was not covered by tests
}
if filter.NodeId != nil {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Smesher,
Token: builder.Eq,
Value: filter.NodeId,
})
}
if filter.Id != nil {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Id,
Token: builder.Eq,
Value: filter.Id,
})
}
if len(filter.Coinbase) > 0 {
addr, err := types.StringToAddress(filter.Coinbase)
if err != nil {
return builder.Operations{}, err

Check warning on line 265 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L265

Added line #L265 was not covered by tests
}
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Coinbase,
Token: builder.Eq,
Value: addr.Bytes(),
})
}
if filter.StartEpoch != 0 {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Epoch,
Token: builder.Gte,
Value: int64(filter.StartEpoch),
})
}
if filter.EndEpoch != 0 {
ops.Filter = append(ops.Filter, builder.Op{
Field: builder.Epoch,
Token: builder.Lte,
Value: int64(filter.EndEpoch),
})

Check warning on line 285 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L281-L285

Added lines #L281 - L285 were not covered by tests
}

ops.Modifiers = append(ops.Modifiers, builder.Modifier{
Key: builder.OrderBy,
Value: "epoch asc, id",
})

if filter.Limit != 0 {
ops.Modifiers = append(ops.Modifiers, builder.Modifier{
Key: builder.Limit,
Value: int64(filter.Limit),
})
}
if filter.Offset != 0 {
ops.Modifiers = append(ops.Modifiers, builder.Modifier{
Key: builder.Offset,
Value: int64(filter.Offset),
})
}

return ops, nil
}

type resultsMatcher struct {
*spacemeshv2alpha1.ActivationStreamRequest
ctx context.Context
}

func (m *resultsMatcher) match(t *events.ActivationTx) bool {
if len(m.NodeId) > 0 {
var nodeId types.NodeID
copy(nodeId[:], m.NodeId)

if t.SmesherID != nodeId {
return false
}
}

if len(m.Id) > 0 {
var atxId types.ATXID
copy(atxId[:], m.Id)

if t.ID() != atxId {
return false
}
}

if len(m.Coinbase) > 0 {
addr, err := types.StringToAddress(m.Coinbase)
if err != nil {
ctxzap.Error(m.ctx, "unable to convert atx coinbase", zap.Error(err))
return false

Check warning on line 337 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L336-L337

Added lines #L336 - L337 were not covered by tests
}
if t.Coinbase != addr {
return false
}
}

if m.StartEpoch != 0 {
if t.PublishEpoch.Uint32() < m.StartEpoch {
return false

Check warning on line 346 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L346

Added line #L346 was not covered by tests
}
}

if m.EndEpoch != 0 {
if t.PublishEpoch.Uint32() > m.EndEpoch {
return false

Check warning on line 352 in api/grpcserver/v2alpha1/activation.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/activation.go#L351-L352

Added lines #L351 - L352 were not covered by tests
}
}

return true
}
Loading