diff --git a/api/grpcserver/activation_service.go b/api/grpcserver/activation_service.go index 4b5bb0c95e..08fbecb2df 100644 --- a/api/grpcserver/activation_service.go +++ b/api/grpcserver/activation_service.go @@ -14,8 +14,9 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" + "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/events" + "github.com/spacemeshos/go-spacemesh/malfeasance/wire" "github.com/spacemeshos/go-spacemesh/sql" ) @@ -75,7 +76,6 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p proof, err := s.atxProvider.MalfeasanceProof(atx.SmesherID) if err != nil && !errors.Is(err, sql.ErrNotFound) { ctxzap.Error(ctx, "failed to get malfeasance proof", - zap.Stringer("smesher", atx.SmesherID), zap.Stringer("smesher", atx.SmesherID), zap.Stringer("id", atxId), zap.Error(err), @@ -86,7 +86,7 @@ func (s *activationService) Get(ctx context.Context, request *pb.GetRequest) (*p Atx: convertActivation(atx, prev), } if proof != nil { - resp.MalfeasanceProof = events.ToMalfeasancePB(atx.SmesherID, proof, false) + resp.MalfeasanceProof = toMalfeasancePB(atx.SmesherID, proof, false) } return resp, nil } @@ -117,3 +117,33 @@ func (s *activationService) Highest(ctx context.Context, req *emptypb.Empty) (*p Atx: convertActivation(atx, prev), }, nil } + +func toMalfeasancePB(nodeID types.NodeID, proof []byte, includeProof bool) *pb.MalfeasanceProof { + mp := &wire.MalfeasanceProof{} + if err := codec.Decode(proof, mp); err != nil { + return &pb.MalfeasanceProof{} + } + kind := pb.MalfeasanceProof_MALFEASANCE_UNSPECIFIED + switch mp.Proof.Type { + case wire.MultipleATXs: + kind = pb.MalfeasanceProof_MALFEASANCE_ATX + case wire.MultipleBallots: + kind = pb.MalfeasanceProof_MALFEASANCE_BALLOT + case wire.HareEquivocation: + kind = pb.MalfeasanceProof_MALFEASANCE_HARE + case wire.InvalidPostIndex: + kind = pb.MalfeasanceProof_MALFEASANCE_POST_INDEX + case wire.InvalidPrevATX: + kind = pb.MalfeasanceProof_MALFEASANCE_INCORRECT_PREV_ATX + } + result := &pb.MalfeasanceProof{ + SmesherId: &pb.SmesherId{Id: nodeID.Bytes()}, + Layer: &pb.LayerNumber{Number: mp.Layer.Uint32()}, + Kind: kind, + DebugInfo: wire.MalfeasanceInfo(nodeID, mp), + } + if includeProof { + result.Proof = proof + } + return result +} diff --git a/api/grpcserver/activation_service_test.go b/api/grpcserver/activation_service_test.go index b720026d96..6d65524c03 100644 --- a/api/grpcserver/activation_service_test.go +++ b/api/grpcserver/activation_service_test.go @@ -1,4 +1,4 @@ -package grpcserver_test +package grpcserver import ( "context" @@ -13,19 +13,17 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - "github.com/spacemeshos/go-spacemesh/api/grpcserver" "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" - "github.com/spacemeshos/go-spacemesh/events" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/statesql" ) func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) + atxProvider := NewMockatxProvider(ctrl) goldenAtx := types.ATXID{2, 3, 4} - activationService := grpcserver.NewActivationService(atxProvider, goldenAtx) + activationService := NewActivationService(atxProvider, goldenAtx) atxProvider.EXPECT().MaxHeightAtx().Return(types.EmptyATXID, errors.New("blah")) response, err := activationService.Highest(context.Background(), &emptypb.Empty{}) @@ -41,9 +39,9 @@ func Test_Highest_ReturnsGoldenAtxOnError(t *testing.T) { func Test_Highest_ReturnsMaxTickHeight(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) + atxProvider := NewMockatxProvider(ctrl) goldenAtx := types.ATXID{2, 3, 4} - activationService := grpcserver.NewActivationService(atxProvider, goldenAtx) + activationService := NewActivationService(atxProvider, goldenAtx) previous := types.RandomATXID() atx := types.ActivationTx{ @@ -71,8 +69,8 @@ func Test_Highest_ReturnsMaxTickHeight(t *testing.T) { func TestGet_RejectInvalidAtxID(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) + atxProvider := NewMockatxProvider(ctrl) + activationService := NewActivationService(atxProvider, types.ATXID{1}) _, err := activationService.Get(context.Background(), &pb.GetRequest{Id: []byte{1, 2, 3}}) require.Error(t, err) @@ -81,8 +79,8 @@ func TestGet_RejectInvalidAtxID(t *testing.T) { func TestGet_AtxNotPresent(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) + atxProvider := NewMockatxProvider(ctrl) + activationService := NewActivationService(atxProvider, types.ATXID{1}) id := types.RandomATXID() atxProvider.EXPECT().GetAtx(id).Return(nil, nil) @@ -94,8 +92,8 @@ func TestGet_AtxNotPresent(t *testing.T) { func TestGet_AtxProviderReturnsFailure(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) + atxProvider := NewMockatxProvider(ctrl) + activationService := NewActivationService(atxProvider, types.ATXID{1}) id := types.RandomATXID() atxProvider.EXPECT().GetAtx(id).Return(&types.ActivationTx{}, errors.New("")) @@ -107,8 +105,8 @@ func TestGet_AtxProviderReturnsFailure(t *testing.T) { func TestGet_AtxProviderFailsObtainPreviousAtxs(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) + atxProvider := NewMockatxProvider(ctrl) + activationService := NewActivationService(atxProvider, types.ATXID{1}) id := types.RandomATXID() atxProvider.EXPECT().GetAtx(id).Return(&types.ActivationTx{}, nil) @@ -121,8 +119,8 @@ func TestGet_AtxProviderFailsObtainPreviousAtxs(t *testing.T) { func TestGet_HappyPath(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) + atxProvider := NewMockatxProvider(ctrl) + activationService := NewActivationService(atxProvider, types.ATXID{1}) previous := []types.ATXID{types.RandomATXID(), types.RandomATXID()} id := types.RandomATXID() @@ -155,10 +153,10 @@ func TestGet_HappyPath(t *testing.T) { func TestGet_IdentityCanceled(t *testing.T) { ctrl := gomock.NewController(t) - atxProvider := grpcserver.NewMockatxProvider(ctrl) - activationService := grpcserver.NewActivationService(atxProvider, types.ATXID{1}) + atxProvider := NewMockatxProvider(ctrl) + activationService := NewActivationService(atxProvider, types.ATXID{1}) - smesher, proof := grpcserver.BallotMalfeasance(t, statesql.InMemoryTest(t)) + smesher, proof := BallotMalfeasance(t, statesql.InMemoryTest(t)) previous := types.RandomATXID() id := types.RandomATXID() atx := types.ActivationTx{ @@ -185,5 +183,5 @@ func TestGet_IdentityCanceled(t *testing.T) { require.Equal(t, previous.Bytes(), response.Atx.PreviousAtxs[0].Id) require.Equal(t, atx.NumUnits, response.Atx.NumUnits) require.Equal(t, atx.Sequence, response.Atx.Sequence) - require.Equal(t, events.ToMalfeasancePB(smesher, codec.MustEncode(proof), false), response.MalfeasanceProof) + require.Equal(t, toMalfeasancePB(smesher, codec.MustEncode(proof), false), response.MalfeasanceProof) } diff --git a/api/grpcserver/debug_service.go b/api/grpcserver/debug_service.go index 264bb9e984..787734920d 100644 --- a/api/grpcserver/debug_service.go +++ b/api/grpcserver/debug_service.go @@ -220,7 +220,7 @@ func castEventProposal(ev *events.EventProposal) *pb.Proposal { for _, el := range ev.Proposal.Ballot.EligibilityProofs { proposal.Eligibilities = append(proposal.Eligibilities, &pb.Eligibility{ J: el.J, - Signature: el.Sig[:], + Signature: el.Sig.Bytes(), }) } return proposal diff --git a/api/grpcserver/mesh_service.go b/api/grpcserver/mesh_service.go index a256cf1f19..5243dbd411 100644 --- a/api/grpcserver/mesh_service.go +++ b/api/grpcserver/mesh_service.go @@ -615,7 +615,7 @@ func (s *MeshService) MalfeasanceQuery( return nil, status.Error(codes.Internal, err.Error()) } return &pb.MalfeasanceResponse{ - Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof), + Proof: toMalfeasancePB(id, proof, req.IncludeProof), }, nil } @@ -627,7 +627,7 @@ func (s *MeshService) MalfeasanceStream( if sub == nil { return status.Errorf(codes.FailedPrecondition, "event reporting is not enabled") } - eventch, fullch := consumeEvents[events.EventMalfeasance](stream.Context(), sub) + eventCh, fullCh := consumeEvents[events.EventMalfeasance](stream.Context(), sub) if err := stream.SendHeader(metadata.MD{}); err != nil { return status.Errorf(codes.Unavailable, "can't send header") } @@ -639,7 +639,7 @@ func (s *MeshService) MalfeasanceStream( return nil default: res := &pb.MalfeasanceStreamResponse{ - Proof: events.ToMalfeasancePB(id, proof, req.IncludeProof), + Proof: toMalfeasancePB(id, proof, req.IncludeProof), } return stream.Send(res) } @@ -651,11 +651,18 @@ func (s *MeshService) MalfeasanceStream( select { case <-stream.Context().Done(): return nil - case <-fullch: + case <-fullCh: return status.Errorf(codes.Canceled, "buffer is full") - case ev := <-eventch: + case ev := <-eventCh: + proof, err := s.cdb.MalfeasanceProof(ev.Smesher) + if err != nil { + return status.Error( + codes.Internal, + fmt.Errorf("load malfeasance proof for %s: %w", ev.Smesher.ShortString(), err).Error(), + ) + } if err := stream.Send(&pb.MalfeasanceStreamResponse{ - Proof: events.ToMalfeasancePB(ev.Smesher, ev.Proof, req.IncludeProof), + Proof: toMalfeasancePB(ev.Smesher, proof, req.IncludeProof), }); err != nil { return status.Error(codes.Internal, fmt.Errorf("send to stream: %w", err).Error()) } diff --git a/api/grpcserver/mesh_service_test.go b/api/grpcserver/mesh_service_test.go index e08f23c045..988750b7cc 100644 --- a/api/grpcserver/mesh_service_test.go +++ b/api/grpcserver/mesh_service_test.go @@ -177,7 +177,7 @@ func TestMeshService_MalfeasanceQuery(t *testing.T) { require.Equal(t, nodeID, types.BytesToNodeID(resp.Proof.SmesherId.Id)) require.EqualValues(t, layer, resp.Proof.Layer.Number) require.Equal(t, pb.MalfeasanceProof_MALFEASANCE_BALLOT, resp.Proof.Kind) - require.Equal(t, events.ToMalfeasancePB(nodeID, codec.MustEncode(proof), true), resp.Proof) + require.Equal(t, toMalfeasancePB(nodeID, codec.MustEncode(proof), true), resp.Proof) require.NotEmpty(t, resp.Proof.Proof) var got wire.MalfeasanceProof require.NoError(t, codec.Decode(resp.Proof.Proof, &got)) @@ -251,16 +251,16 @@ func TestMeshService_MalfeasanceStream(t *testing.T) { id, proof := AtxMalfeasance(t, db) proofBytes := codec.MustEncode(proof) - events.ReportMalfeasance(id, proofBytes) + events.ReportMalfeasance(id) resp, err := stream.Recv() require.NoError(t, err) - require.Equal(t, events.ToMalfeasancePB(id, proofBytes, false), resp.Proof) + require.Equal(t, toMalfeasancePB(id, proofBytes, false), resp.Proof) id, proof = BallotMalfeasance(t, db) proofBytes = codec.MustEncode(proof) - events.ReportMalfeasance(id, proofBytes) + events.ReportMalfeasance(id) resp, err = stream.Recv() require.NoError(t, err) - require.Equal(t, events.ToMalfeasancePB(id, proofBytes, false), resp.Proof) + require.Equal(t, toMalfeasancePB(id, proofBytes, false), resp.Proof) } type MeshAPIMockInstrumented struct { diff --git a/api/grpcserver/v2alpha1/interface.go b/api/grpcserver/v2alpha1/interface.go index d0a03b528c..377d94937b 100644 --- a/api/grpcserver/v2alpha1/interface.go +++ b/api/grpcserver/v2alpha1/interface.go @@ -1,7 +1,13 @@ package v2alpha1 +import ( + "context" + + "github.com/spacemeshos/go-spacemesh/common/types" +) + //go:generate mockgen -typed -package=v2alpha1 -destination=./mocks.go -source=./interface.go type malfeasanceInfo interface { - Info(data []byte) (map[string]string, error) + Info(ctx context.Context, nodeID types.NodeID) (map[string]string, error) } diff --git a/api/grpcserver/v2alpha1/malfeasance.go b/api/grpcserver/v2alpha1/malfeasance.go index d2f9b5b41d..10e91527ac 100644 --- a/api/grpcserver/v2alpha1/malfeasance.go +++ b/api/grpcserver/v2alpha1/malfeasance.go @@ -23,6 +23,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/builder" "github.com/spacemeshos/go-spacemesh/sql/identities" + "github.com/spacemeshos/go-spacemesh/sql/malfeasance" ) const ( @@ -30,16 +31,18 @@ const ( MalfeasanceStream = "malfeasance_stream_v2alpha1" ) -func NewMalfeasanceService(db sql.Executor, malfeasanceHandler malfeasanceInfo) *MalfeasanceService { +func NewMalfeasanceService(db sql.StateDatabase, malHandler, legacyHandler malfeasanceInfo) *MalfeasanceService { return &MalfeasanceService{ - db: db, - info: malfeasanceHandler, + db: db, + info: malHandler, + infoLegacy: legacyHandler, } } type MalfeasanceService struct { - db sql.Executor - info malfeasanceInfo + db sql.StateDatabase + info malfeasanceInfo + infoLegacy malfeasanceInfo } func (s *MalfeasanceService) RegisterService(server *grpc.Server) { @@ -65,36 +68,65 @@ func (s *MalfeasanceService) List( return nil, status.Error(codes.InvalidArgument, "limit must be set to <= 100") } - ops, err := toMalfeasanceOps(request) - if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) - } + result := &spacemeshv2alpha1.MalfeasanceList{} + err := s.db.WithTx(ctx, func(tx sql.Transaction) error { + legacyCount, err := identities.CountMalicious(tx) + if err != nil { + return status.Error(codes.Internal, err.Error()) + } - proofs := make([]*spacemeshv2alpha1.MalfeasanceProof, 0, request.Limit) - if err := identities.IterateOps(s.db, ops, func(id types.NodeID, proof []byte, received time.Time) bool { - rst := toProof(ctx, s.info, id, proof) - if rst == nil { - return true + switch { + case request.Offset+request.Limit < legacyCount: // only legacy proofs + proofs, err := fetchLegacyFromDB(ctx, tx, s.infoLegacy, request) + if err != nil { + return err + } + result.Proofs = proofs + return nil + case request.Offset >= legacyCount: // only new proofs + request.Offset -= legacyCount + proofs, err := fetchFromDB(ctx, tx, s.info, request) + if err != nil { + return err + } + result.Proofs = proofs + return nil + default: // both legacy and new proofs + legacyProofs, err := fetchLegacyFromDB(ctx, tx, s.infoLegacy, request) + if err != nil { + return err + } + request.Limit -= uint64(len(legacyProofs)) + proofs, err := fetchFromDB(ctx, tx, s.info, request) + if err != nil { + return err + } + result.Proofs = append(legacyProofs, proofs...) + return nil } - proofs = append(proofs, rst) - return true - }); err != nil { - return nil, status.Error(codes.Internal, err.Error()) + }) + if err != nil { + return nil, err } - - return &spacemeshv2alpha1.MalfeasanceList{Proofs: proofs}, nil + return result, nil } -func NewMalfeasanceStreamService(db sql.Executor, malfeasanceHandler malfeasanceInfo) *MalfeasanceStreamService { +func NewMalfeasanceStreamService( + db sql.Executor, + malfeasanceHandler, + legacyHandler malfeasanceInfo, +) *MalfeasanceStreamService { return &MalfeasanceStreamService{ - db: db, - info: malfeasanceHandler, + db: db, + info: malfeasanceHandler, + infoLegacy: legacyHandler, } } type MalfeasanceStreamService struct { - db sql.Executor - info malfeasanceInfo + db sql.Executor + info malfeasanceInfo + infoLegacy malfeasanceInfo } func (s *MalfeasanceStreamService) RegisterService(server *grpc.Server) { @@ -113,47 +145,78 @@ func (s *MalfeasanceStreamService) Stream( request *spacemeshv2alpha1.MalfeasanceStreamRequest, stream spacemeshv2alpha1.MalfeasanceStreamService_StreamServer, ) error { - var sub *events.BufferedSubscription[events.EventMalfeasance] - if request.Watch { - matcher := malfeasanceMatcher{request} - var err error - sub, err = events.SubscribeMatched(matcher.match) - if err != nil { + legacyProofs, err := fetchLegacyFromDB( + stream.Context(), + s.db, + s.infoLegacy, + &spacemeshv2alpha1.MalfeasanceRequest{SmesherId: request.SmesherId}, + ) + if err != nil { + return err + } + for _, rst := range legacyProofs { + err := stream.Send(rst) + switch { + case errors.Is(err, io.EOF): + return nil + case err != nil: return status.Error(codes.Internal, err.Error()) } - defer sub.Close() - if err := stream.SendHeader(metadata.MD{}); err != nil { - return status.Errorf(codes.Unavailable, "can't send header") - } } - ops, err := toMalfeasanceOps(&spacemeshv2alpha1.MalfeasanceRequest{ - SmesherId: request.SmesherId, - }) + proofs, err := fetchFromDB( + stream.Context(), + s.db, + s.info, + &spacemeshv2alpha1.MalfeasanceRequest{SmesherId: request.SmesherId}, + ) if err != nil { - return status.Error(codes.InvalidArgument, err.Error()) + return err + } + for _, rst := range proofs { + err := stream.Send(rst) + switch { + case errors.Is(err, io.EOF): + return nil + case err != nil: + return status.Error(codes.Internal, err.Error()) + } + } + + if !request.Watch { + return nil } - ctx, cancel := context.WithCancel(stream.Context()) - defer cancel() - dbChan, errChan := s.fetchFromDB(ctx, ops) + matcher := malfeasanceMatcher{request} + sub, err := events.SubscribeMatched(matcher.match) + if err != nil { + return status.Error(codes.Internal, err.Error()) + } + defer sub.Close() + eventsOut := sub.Out() + eventsFull := sub.Full() - var eventsOut <-chan events.EventMalfeasance - var eventsFull <-chan struct{} - if sub != nil { - eventsOut = sub.Out() - eventsFull = sub.Full() + if err := stream.SendHeader(metadata.MD{}); err != nil { + return status.Errorf(codes.Unavailable, "can't send header") } for { select { - // process events first + // process pending events first case rst := <-eventsOut: - proof := toProof(stream.Context(), s.info, rst.Smesher, rst.Proof) + proof := fetchMetaData(stream.Context(), s.infoLegacy, rst.Smesher) if proof == nil { - continue + // try again with the new handler + proof = fetchMetaData(stream.Context(), s.info, rst.Smesher) + if proof == nil { + ctxzap.Debug(stream.Context(), "failed to get malfeasance info", + zap.String("smesher", rst.Smesher.String()), + zap.Error(err), + ) + continue + } } - err = stream.Send(proof) + err := stream.Send(proof) switch { case errors.Is(err, io.EOF): return nil @@ -163,11 +226,19 @@ func (s *MalfeasanceStreamService) Stream( default: select { case rst := <-eventsOut: - proof := toProof(stream.Context(), s.info, rst.Smesher, rst.Proof) + proof := fetchMetaData(stream.Context(), s.infoLegacy, rst.Smesher) if proof == nil { - continue + // try again with the new handler + proof = fetchMetaData(stream.Context(), s.info, rst.Smesher) + if proof == nil { + ctxzap.Debug(stream.Context(), "failed to get malfeasance info", + zap.String("smesher", rst.Smesher.String()), + zap.Error(err), + ) + continue + } } - err = stream.Send(proof) + err := stream.Send(proof) switch { case errors.Is(err, io.EOF): return nil @@ -176,23 +247,6 @@ func (s *MalfeasanceStreamService) Stream( } case <-eventsFull: return status.Error(codes.Canceled, "buffer overflow") - case rst, ok := <-dbChan: - if !ok { - dbChan = nil - if sub == nil { - return nil - } - continue - } - err = stream.Send(rst) - switch { - case errors.Is(err, io.EOF): - return nil - case err != nil: - return status.Error(codes.Internal, err.Error()) - } - case err := <-errChan: - return err case <-stream.Context().Done(): return nil } @@ -200,49 +254,13 @@ func (s *MalfeasanceStreamService) Stream( } } -func (s *MalfeasanceStreamService) fetchFromDB( - ctx context.Context, - ops builder.Operations, -) (<-chan *spacemeshv2alpha1.MalfeasanceProof, <-chan error) { - dbChan := make(chan *spacemeshv2alpha1.MalfeasanceProof) - errChan := make(chan error, 1) // buffered to avoid blocking, routine should exit immediately after sending an error - - go func() { - defer close(dbChan) - if err := identities.IterateOps(s.db, ops, - func(id types.NodeID, proof []byte, received time.Time) bool { - rst := toProof(ctx, s.info, id, proof) - if rst == nil { - return true - } - - select { - case dbChan <- rst: - return true - case <-ctx.Done(): - // exit if the context is canceled - return false - } - }, - ); err != nil { - errChan <- status.Error(codes.Internal, err.Error()) - } - }() - return dbChan, errChan -} - -func toProof( +func fetchMetaData( ctx context.Context, info malfeasanceInfo, id types.NodeID, - proof []byte, ) *spacemeshv2alpha1.MalfeasanceProof { - properties, err := info.Info(proof) + properties, err := info.Info(ctx, id) if err != nil { - ctxzap.Debug(ctx, "failed to get malfeasance info", - zap.String("smesher", id.String()), - zap.Error(err), - ) return nil } domain, err := strconv.ParseUint(properties["domain"], 10, 64) @@ -267,18 +285,70 @@ func toProof( delete(properties, "type") return &spacemeshv2alpha1.MalfeasanceProof{ Smesher: id.Bytes(), - Domain: spacemeshv2alpha1.MalfeasanceProof_MalfeasanceDomain(domain), + Domain: spacemeshv2alpha1.MalfeasanceProof_MalfeasanceDomain(domain), // TODO(mafa): add new domains Type: uint32(proofType), Properties: properties, } } +func fetchFromDB( + ctx context.Context, + db sql.Executor, + info malfeasanceInfo, + request *spacemeshv2alpha1.MalfeasanceRequest, +) ([]*spacemeshv2alpha1.MalfeasanceProof, error) { + ops, err := toMalfeasanceOps(request) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + ids := make([]types.NodeID, 0, request.Limit) + if err := malfeasance.IterateOps(db, ops, func(id types.NodeID, _ []byte, _ int, _ time.Time) bool { + ids = append(ids, id) + return true + }); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + proofs := make([]*spacemeshv2alpha1.MalfeasanceProof, 0, len(ids)) + for _, id := range ids { + rst := fetchMetaData(ctx, info, id) + if rst == nil { + continue + } + proofs = append(proofs, rst) + } + return proofs, nil +} + +func fetchLegacyFromDB( + ctx context.Context, + db sql.Executor, + info malfeasanceInfo, + request *spacemeshv2alpha1.MalfeasanceRequest, +) ([]*spacemeshv2alpha1.MalfeasanceProof, error) { + ops, err := toMalfeasanceOps(request) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + ids := make([]types.NodeID, 0, request.Limit) + if err := identities.IterateOps(db, ops, func(id types.NodeID, _ []byte, _ time.Time) bool { + ids = append(ids, id) + return true + }); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + proofs := make([]*spacemeshv2alpha1.MalfeasanceProof, 0, len(ids)) + for _, id := range ids { + rst := fetchMetaData(ctx, info, id) + if rst == nil { + continue + } + proofs = append(proofs, rst) + } + return proofs, nil +} + func toMalfeasanceOps(filter *spacemeshv2alpha1.MalfeasanceRequest) (builder.Operations, error) { ops := builder.Operations{} - ops.Filter = append(ops.Filter, builder.Op{ - Field: builder.Proof, - Token: builder.IsNotNull, - }) ops.Modifiers = append(ops.Modifiers, builder.Modifier{ Key: builder.OrderBy, Value: builder.Smesher, diff --git a/api/grpcserver/v2alpha1/malfeasance_test.go b/api/grpcserver/v2alpha1/malfeasance_test.go index 22744ad648..4ed7616e3c 100644 --- a/api/grpcserver/v2alpha1/malfeasance_test.go +++ b/api/grpcserver/v2alpha1/malfeasance_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "maps" "strconv" "testing" "time" @@ -20,6 +21,8 @@ import ( "github.com/spacemeshos/go-spacemesh/events" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/identities" + "github.com/spacemeshos/go-spacemesh/sql/malfeasance" + "github.com/spacemeshos/go-spacemesh/sql/marriage" "github.com/spacemeshos/go-spacemesh/sql/statesql" ) @@ -31,34 +34,93 @@ type malInfo struct { } func TestMalfeasanceService_List(t *testing.T) { - setup := func(t *testing.T) (spacemeshv2alpha1.MalfeasanceServiceClient, []malInfo) { - db := statesql.InMemoryTest(t) - ctrl := gomock.NewController(t) - info := NewMockmalfeasanceInfo(ctrl) - - proofs := make([]malInfo, 90) - for i := range proofs { - proofs[i] = malInfo{ID: types.RandomNodeID(), Proof: types.RandomBytes(100)} - proofs[i].Properties = map[string]string{ - "domain": "0", - "type": strconv.FormatUint(uint64(i%4+1), 10), - fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i), - } - info.EXPECT().Info(proofs[i].Proof).Return(proofs[i].Properties, nil).AnyTimes() - - require.NoError(t, identities.SetMalicious(db, proofs[i].ID, proofs[i].Proof, time.Now())) + db := statesql.InMemoryTest(t) + ctrl := gomock.NewController(t) + info := NewMockmalfeasanceInfo(ctrl) + legacyInfo := NewMockmalfeasanceInfo(ctrl) + + proofs := make([]malInfo, 90) + + // first 20 are legacy proofs + for i := range 20 { + proofs[i] = malInfo{ID: types.RandomNodeID(), Proof: types.RandomBytes(100)} + proofs[i].Properties = map[string]string{ + "domain": "0", + "type": strconv.FormatUint(uint64(i%4+1), 10), + fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i), } + info.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(nil, sql.ErrNotFound).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[i].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[i].Properties), nil + }).AnyTimes() + require.NoError(t, identities.SetMalicious(db, proofs[i].ID, proofs[i].Proof, time.Now())) + } - svc := NewMalfeasanceService(db, info) - cfg, cleanup := launchServer(t, svc) - t.Cleanup(cleanup) + // next 50 are proofs for individual identities + for i := 20; i < 70; i++ { + proofs[i] = malInfo{ID: types.RandomNodeID(), Proof: types.RandomBytes(100)} + proofs[i].Properties = map[string]string{ + "domain": strconv.FormatUint(uint64(i%4+1), 10), + "type": strconv.FormatUint(uint64(i%4+1), 10), + fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i), + } + info.EXPECT().Info(gomock.Any(), proofs[i].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[i].Properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(nil, sql.ErrNotFound).AnyTimes() + require.NoError(t, malfeasance.AddProof(db, proofs[i].ID, nil, proofs[i].Proof, i%4+1, time.Now())) + } - conn := dialGrpc(t, cfg) - return spacemeshv2alpha1.NewMalfeasanceServiceClient(conn), proofs + // last 20 are proofs for a single marriage + id, err := marriage.NewID(db) + require.NoError(t, err) + marriageATX := types.RandomATXID() + for i := 70; i < 90; i++ { + proofs[i] = malInfo{ID: types.RandomNodeID()} + err := marriage.Add(db, marriage.Info{ + ID: id, + NodeID: proofs[i].ID, + ATX: marriageATX, + MarriageIndex: i % 70, + Target: proofs[70].ID, + Signature: types.RandomEdSignature(), + }) + require.NoError(t, err) + } + proofs[70].Proof = types.RandomBytes(100) + proofs[70].Properties = map[string]string{ + "domain": "1", + "type": "1", + "key": "value", } + info.EXPECT().Info(gomock.Any(), proofs[70].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[70].Properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[70].ID).Return(nil, sql.ErrNotFound).AnyTimes() + require.NoError(t, malfeasance.AddProof(db, proofs[70].ID, &id, proofs[70].Proof, 1, time.Now())) + for i := 71; i < 90; i++ { + proofs[i] = malInfo{ID: proofs[i].ID, Proof: proofs[70].Proof} + proofs[i].Properties = maps.Clone(proofs[70].Properties) + proofs[i].Properties["malicious_id"] = proofs[i].ID.String() + + info.EXPECT().Info(gomock.Any(), proofs[i].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[i].Properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(nil, sql.ErrNotFound).AnyTimes() + + require.NoError(t, malfeasance.SetMalicious(db, proofs[i].ID, id, time.Now())) + } + + svc := NewMalfeasanceService(db, info, legacyInfo) + cfg, cleanup := launchServer(t, svc) + t.Cleanup(cleanup) t.Run("limit set too high", func(t *testing.T) { - client, _ := setup(t) + client := spacemeshv2alpha1.NewMalfeasanceServiceClient(dialGrpc(t, cfg)) _, err := client.List(context.Background(), &spacemeshv2alpha1.MalfeasanceRequest{Limit: 200}) require.Error(t, err) @@ -69,7 +131,7 @@ func TestMalfeasanceService_List(t *testing.T) { }) t.Run("no limit set", func(t *testing.T) { - client, _ := setup(t) + client := spacemeshv2alpha1.NewMalfeasanceServiceClient(dialGrpc(t, cfg)) _, err := client.List(context.Background(), &spacemeshv2alpha1.MalfeasanceRequest{}) require.Error(t, err) @@ -80,7 +142,7 @@ func TestMalfeasanceService_List(t *testing.T) { }) t.Run("limit and offset", func(t *testing.T) { - client, _ := setup(t) + client := spacemeshv2alpha1.NewMalfeasanceServiceClient(dialGrpc(t, cfg)) list, err := client.List(context.Background(), &spacemeshv2alpha1.MalfeasanceRequest{ Limit: 25, Offset: 50, @@ -90,14 +152,14 @@ func TestMalfeasanceService_List(t *testing.T) { }) t.Run("all", func(t *testing.T) { - client, _ := setup(t) + client := spacemeshv2alpha1.NewMalfeasanceServiceClient(dialGrpc(t, cfg)) list, err := client.List(context.Background(), &spacemeshv2alpha1.MalfeasanceRequest{Limit: 100}) require.NoError(t, err) require.Len(t, list.Proofs, 90) }) t.Run("smesherId", func(t *testing.T) { - client, proofs := setup(t) + client := spacemeshv2alpha1.NewMalfeasanceServiceClient(dialGrpc(t, cfg)) list, err := client.List(context.Background(), &spacemeshv2alpha1.MalfeasanceRequest{ Limit: 1, SmesherId: [][]byte{proofs[1].ID.Bytes()}, @@ -112,21 +174,84 @@ func TestMalfeasanceStreamService_Stream(t *testing.T) { t *testing.T, db sql.Executor, info *MockmalfeasanceInfo, + legacyInfo *MockmalfeasanceInfo, ) spacemeshv2alpha1.MalfeasanceStreamServiceClient { proofs := make([]malInfo, 90) - for i := range proofs { + // first 20 are legacy proofs + for i := range 20 { proofs[i] = malInfo{ID: types.RandomNodeID(), Proof: types.RandomBytes(100)} proofs[i].Properties = map[string]string{ "domain": "0", "type": strconv.FormatUint(uint64(i%4+1), 10), fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i), } - info.EXPECT().Info(proofs[i].Proof).Return(proofs[i].Properties, nil).AnyTimes() - + info.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(nil, sql.ErrNotFound).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[i].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[i].Properties), nil + }).AnyTimes() require.NoError(t, identities.SetMalicious(db, proofs[i].ID, proofs[i].Proof, time.Now())) } - svc := NewMalfeasanceStreamService(db, info) + // next 50 are proofs for individual identities + for i := 20; i < 70; i++ { + proofs[i] = malInfo{ID: types.RandomNodeID(), Proof: types.RandomBytes(100)} + proofs[i].Properties = map[string]string{ + "domain": strconv.FormatUint(uint64(i%4+1), 10), + "type": strconv.FormatUint(uint64(i%4+1), 10), + fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i), + } + info.EXPECT().Info(gomock.Any(), proofs[i].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[i].Properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(nil, sql.ErrNotFound).AnyTimes() + require.NoError(t, malfeasance.AddProof(db, proofs[i].ID, nil, proofs[i].Proof, i%4+1, time.Now())) + } + + // last 20 are proofs for a single marriage + id, err := marriage.NewID(db) + require.NoError(t, err) + marriageATX := types.RandomATXID() + for i := 70; i < 90; i++ { + proofs[i] = malInfo{ID: types.RandomNodeID()} + err := marriage.Add(db, marriage.Info{ + ID: id, + NodeID: proofs[i].ID, + ATX: marriageATX, + MarriageIndex: i % 70, + Target: proofs[70].ID, + Signature: types.RandomEdSignature(), + }) + require.NoError(t, err) + } + proofs[70].Proof = types.RandomBytes(100) + proofs[70].Properties = map[string]string{ + "domain": "1", + "type": "1", + "key": "value", + } + info.EXPECT().Info(gomock.Any(), proofs[70].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[70].Properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[70].ID).Return(nil, sql.ErrNotFound).AnyTimes() + require.NoError(t, malfeasance.AddProof(db, proofs[70].ID, &id, proofs[70].Proof, 1, time.Now())) + for i := 71; i < 90; i++ { + proofs[i] = malInfo{ID: proofs[i].ID, Proof: proofs[70].Proof} + proofs[i].Properties = maps.Clone(proofs[70].Properties) + proofs[i].Properties["malicious_id"] = proofs[i].ID.String() + + info.EXPECT().Info(gomock.Any(), proofs[i].ID).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(proofs[i].Properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), proofs[i].ID).Return(nil, sql.ErrNotFound).AnyTimes() + + require.NoError(t, malfeasance.SetMalicious(db, proofs[i].ID, id, time.Now())) + } + + svc := NewMalfeasanceStreamService(db, info, legacyInfo) cfg, cleanup := launchServer(t, svc) t.Cleanup(cleanup) @@ -138,9 +263,11 @@ func TestMalfeasanceStreamService_Stream(t *testing.T) { events.InitializeReporter() t.Cleanup(events.CloseEventReporter) + db := statesql.InMemoryTest(t) ctrl := gomock.NewController(t) info := NewMockmalfeasanceInfo(ctrl) - client := setup(t, statesql.InMemoryTest(t), info) + legacyInfo := NewMockmalfeasanceInfo(ctrl) + client := setup(t, db, info, legacyInfo) stream, err := client.Stream(context.Background(), &spacemeshv2alpha1.MalfeasanceStreamRequest{}) require.NoError(t, err) @@ -163,30 +290,82 @@ func TestMalfeasanceStreamService_Stream(t *testing.T) { db := statesql.InMemoryTest(t) ctrl := gomock.NewController(t) info := NewMockmalfeasanceInfo(ctrl) - client := setup(t, db, info) + legacyInfo := NewMockmalfeasanceInfo(ctrl) + client := setup(t, db, info, legacyInfo) const ( - start = 100 - n = 10 + nLegacy = 5 + nIndividual = 5 + nMarriage = 5 ) - var streamed []*events.EventMalfeasance - for i := 0; i < n; i++ { + streamed := make([]*events.EventMalfeasance, 0, nLegacy+nIndividual+nMarriage) + for i := 0; i < nLegacy; i++ { smesher := types.RandomNodeID() streamed = append(streamed, &events.EventMalfeasance{ Smesher: smesher, - Proof: types.RandomBytes(100), }) properties := map[string]string{ "domain": "0", "type": strconv.FormatUint(uint64(i%4+1), 10), fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i), } - info.EXPECT().Info(streamed[i].Proof).Return(properties, nil).AnyTimes() + info.EXPECT().Info(gomock.Any(), streamed[i].Smesher).Return(nil, sql.ErrNotFound).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), streamed[i].Smesher).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(properties), nil + }).AnyTimes() + } + + for i := nLegacy; i < nLegacy+nIndividual; i++ { + smesher := types.RandomNodeID() + streamed = append(streamed, &events.EventMalfeasance{ + Smesher: smesher, + }) + properties := map[string]string{ + "domain": strconv.FormatUint(uint64(i%4+1), 10), + "type": strconv.FormatUint(uint64(i%4+1), 10), + fmt.Sprintf("key%d", i): fmt.Sprintf("value%d", i), + } + info.EXPECT().Info(gomock.Any(), streamed[i].Smesher).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), streamed[i].Smesher).Return(nil, sql.ErrNotFound).AnyTimes() + } + + id, err := marriage.NewID(db) + require.NoError(t, err) + marriageATX := types.RandomATXID() + for i := nLegacy + nIndividual; i < nLegacy+nIndividual+nMarriage; i++ { + smesher := types.RandomNodeID() + streamed = append(streamed, &events.EventMalfeasance{ + Smesher: smesher, + }) + properties := map[string]string{ + "domain": "1", + "type": "1", + "key": "value", + } + info.EXPECT().Info(gomock.Any(), streamed[i].Smesher).DoAndReturn( + func(_ context.Context, id types.NodeID) (map[string]string, error) { + return maps.Clone(properties), nil + }).AnyTimes() + legacyInfo.EXPECT().Info(gomock.Any(), streamed[i].Smesher).Return(nil, sql.ErrNotFound).AnyTimes() + + err := marriage.Add(db, marriage.Info{ + ID: id, + NodeID: streamed[i].Smesher, + ATX: marriageATX, + MarriageIndex: i%nLegacy + nIndividual, + Target: streamed[nLegacy+nIndividual].Smesher, + Signature: types.RandomEdSignature(), + }) + require.NoError(t, err) } request := &spacemeshv2alpha1.MalfeasanceStreamRequest{ - SmesherId: [][]byte{streamed[3].Smesher.Bytes()}, + SmesherId: [][]byte{streamed[3].Smesher.Bytes(), streamed[7].Smesher.Bytes(), streamed[12].Smesher.Bytes()}, Watch: true, } stream, err := client.Stream(context.Background(), request) @@ -194,9 +373,9 @@ func TestMalfeasanceStreamService_Stream(t *testing.T) { _, err = stream.Header() require.NoError(t, err) - var expect []types.NodeID + expect := make([]types.NodeID, 0, len(request.SmesherId)) for _, rst := range streamed { - events.ReportMalfeasance(rst.Smesher, rst.Proof) + events.ReportMalfeasance(rst.Smesher) matcher := malfeasanceMatcher{request} if matcher.match(rst) { expect = append(expect, rst.Smesher) diff --git a/api/grpcserver/v2alpha1/mocks.go b/api/grpcserver/v2alpha1/mocks.go index 6f7f88adf8..0a4529afab 100644 --- a/api/grpcserver/v2alpha1/mocks.go +++ b/api/grpcserver/v2alpha1/mocks.go @@ -10,8 +10,10 @@ package v2alpha1 import ( + context "context" reflect "reflect" + types "github.com/spacemeshos/go-spacemesh/common/types" gomock "go.uber.org/mock/gomock" ) @@ -40,18 +42,18 @@ func (m *MockmalfeasanceInfo) EXPECT() *MockmalfeasanceInfoMockRecorder { } // Info mocks base method. -func (m *MockmalfeasanceInfo) Info(data []byte) (map[string]string, error) { +func (m *MockmalfeasanceInfo) Info(ctx context.Context, nodeID types.NodeID) (map[string]string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Info", data) + ret := m.ctrl.Call(m, "Info", ctx, nodeID) ret0, _ := ret[0].(map[string]string) ret1, _ := ret[1].(error) return ret0, ret1 } // Info indicates an expected call of Info. -func (mr *MockmalfeasanceInfoMockRecorder) Info(data any) *MockmalfeasanceInfoInfoCall { +func (mr *MockmalfeasanceInfoMockRecorder) Info(ctx, nodeID any) *MockmalfeasanceInfoInfoCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockmalfeasanceInfo)(nil).Info), data) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockmalfeasanceInfo)(nil).Info), ctx, nodeID) return &MockmalfeasanceInfoInfoCall{Call: call} } @@ -67,13 +69,13 @@ func (c *MockmalfeasanceInfoInfoCall) Return(arg0 map[string]string, arg1 error) } // Do rewrite *gomock.Call.Do -func (c *MockmalfeasanceInfoInfoCall) Do(f func([]byte) (map[string]string, error)) *MockmalfeasanceInfoInfoCall { +func (c *MockmalfeasanceInfoInfoCall) Do(f func(context.Context, types.NodeID) (map[string]string, error)) *MockmalfeasanceInfoInfoCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockmalfeasanceInfoInfoCall) DoAndReturn(f func([]byte) (map[string]string, error)) *MockmalfeasanceInfoInfoCall { +func (c *MockmalfeasanceInfoInfoCall) DoAndReturn(f func(context.Context, types.NodeID) (map[string]string, error)) *MockmalfeasanceInfoInfoCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/events/events.go b/events/events.go index 8a07fa5bc5..80b2f1ab67 100644 --- a/events/events.go +++ b/events/events.go @@ -7,10 +7,8 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/log" - "github.com/spacemeshos/go-spacemesh/malfeasance/wire" ) type UserEvent struct { @@ -332,21 +330,25 @@ func EmitProposal(nodeID types.NodeID, layer types.LayerID, proposal types.Propo &pb.Event_Proposal{ Proposal: &pb.EventProposal{ Layer: layer.Uint32(), - Proposal: proposal[:], + Proposal: proposal.Bytes(), Smesher: nodeID.Bytes(), }, }, ) } -func EmitOwnMalfeasanceProof(nodeID types.NodeID, proof []byte) { +func EmitOwnMalfeasanceProof(nodeID types.NodeID) { const help = "Node committed malicious behavior. Identity will be canceled." emitUserEvent( help, false, &pb.Event_Malfeasance{ Malfeasance: &pb.EventMalfeasance{ - Proof: ToMalfeasancePB(nodeID, proof, false), + Proof: &pb.MalfeasanceProof{ + SmesherId: &pb.SmesherId{Id: nodeID.Bytes()}, + Layer: &pb.LayerNumber{Number: uint32(0)}, + Kind: pb.MalfeasanceProof_MALFEASANCE_UNSPECIFIED, + }, }, }, ) @@ -366,33 +368,3 @@ func emitUserEvent(help string, failure bool, details pb.IsEventDetails) { } } } - -func ToMalfeasancePB(nodeID types.NodeID, proof []byte, includeProof bool) *pb.MalfeasanceProof { - mp := &wire.MalfeasanceProof{} - if err := codec.Decode(proof, mp); err != nil { - return &pb.MalfeasanceProof{} - } - kind := pb.MalfeasanceProof_MALFEASANCE_UNSPECIFIED - switch mp.Proof.Type { - case wire.MultipleATXs: - kind = pb.MalfeasanceProof_MALFEASANCE_ATX - case wire.MultipleBallots: - kind = pb.MalfeasanceProof_MALFEASANCE_BALLOT - case wire.HareEquivocation: - kind = pb.MalfeasanceProof_MALFEASANCE_HARE - case wire.InvalidPostIndex: - kind = pb.MalfeasanceProof_MALFEASANCE_POST_INDEX - case wire.InvalidPrevATX: - kind = pb.MalfeasanceProof_MALFEASANCE_INCORRECT_PREV_ATX - } - result := &pb.MalfeasanceProof{ - SmesherId: &pb.SmesherId{Id: nodeID.Bytes()}, - Layer: &pb.LayerNumber{Number: mp.Layer.Uint32()}, - Kind: kind, - DebugInfo: wire.MalfeasanceInfo(nodeID, mp), - } - if includeProof { - result.Proof = proof - } - return result -} diff --git a/events/malfeasance.go b/events/malfeasance.go index 4ea0ef6ea8..ba789881d1 100644 --- a/events/malfeasance.go +++ b/events/malfeasance.go @@ -8,7 +8,6 @@ import ( // EventMalfeasance includes the malfeasance proof. type EventMalfeasance struct { Smesher types.NodeID - Proof []byte } // SubscribeMalfeasance subscribes malfeasance events. @@ -26,11 +25,11 @@ func SubscribeMalfeasance() Subscription { } // ReportMalfeasance reports a malfeasance proof. -func ReportMalfeasance(nodeID types.NodeID, proof []byte) { +func ReportMalfeasance(nodeID types.NodeID) { mu.RLock() defer mu.RUnlock() if reporter != nil { - if err := reporter.malfeasanceEmitter.Emit(EventMalfeasance{Smesher: nodeID, Proof: proof}); err != nil { + if err := reporter.malfeasanceEmitter.Emit(EventMalfeasance{Smesher: nodeID}); err != nil { log.With().Error("failed to emit malfeasance proof", log.Err(err)) } } diff --git a/events/reporter.go b/events/reporter.go index 6e5a22367e..35c02ce969 100644 --- a/events/reporter.go +++ b/events/reporter.go @@ -138,6 +138,9 @@ func ReportNodeStatusUpdate() error { // ReportResult reports creation or receipt of a new tx receipt. func ReportResult(rst types.TransactionWithResult) error { + mu.RLock() + defer mu.RUnlock() + if reporter != nil { return reporter.resultsEmitter.Emit(rst) } @@ -451,6 +454,7 @@ func CloseEventReporter() { mu.Lock() defer mu.Unlock() if reporter != nil { + close(reporter.stopChan) if err := reporter.transactionEmitter.Close(); err != nil { log.With().Panic("failed to close transactionEmitter", log.Err(err)) } @@ -481,8 +485,9 @@ func CloseEventReporter() { if err := reporter.malfeasanceEmitter.Close(); err != nil { log.With().Panic("failed to close malfeasanceEmitter", log.Err(err)) } - - close(reporter.stopChan) + if err := reporter.events.emitter.Close(); err != nil { + log.With().Panic("failed to close eventsEmitter", log.Err(err)) + } reporter = nil } } diff --git a/go.mod b/go.mod index fa9ecb9aa0..177b6b1348 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/seehuhn/mt19937 v1.0.0 github.com/slok/go-http-metrics v0.13.0 - github.com/spacemeshos/api/release/go v1.58.0 + github.com/spacemeshos/api/release/go v1.59.0 github.com/spacemeshos/economics v0.1.4 github.com/spacemeshos/fixed v0.1.2 github.com/spacemeshos/go-scale v1.2.1 @@ -262,6 +262,11 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) -// temporary until this issue is resolved and cloud.google.com/go/storage has been updated -// https://github.com/googleapis/google-cloud-go/issues/11283 -exclude google.golang.org/grpc v1.69.0 +exclude ( + // temporary until this issue is resolved and cloud.google.com/go/storage has been updated + // https://github.com/googleapis/google-cloud-go/issues/11283 + google.golang.org/grpc v1.68.2 + google.golang.org/grpc v1.69.0 + google.golang.org/grpc v1.69.1 + google.golang.org/grpc v1.69.2 +) diff --git a/go.sum b/go.sum index 8d1ea26365..0a3778d5c0 100644 --- a/go.sum +++ b/go.sum @@ -628,8 +628,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= -github.com/spacemeshos/api/release/go v1.58.0 h1:me2c3cPrdPiQ/+HqblSzhKYMeMKHKk+oNE/EaEN8LlE= -github.com/spacemeshos/api/release/go v1.58.0/go.mod h1:tJaxo2HaHFyL1726EmiCV1y6ab4LVwU2iB71bZkkfUQ= +github.com/spacemeshos/api/release/go v1.59.0 h1:/2K3kzVKdzypaRdc2uS4dsAL8TVXWXQr66KpEVeB4Pw= +github.com/spacemeshos/api/release/go v1.59.0/go.mod h1:tJaxo2HaHFyL1726EmiCV1y6ab4LVwU2iB71bZkkfUQ= github.com/spacemeshos/economics v0.1.4 h1:twlawrcQhYNqPgyDv08+24EL/OgUKz3d7q+PvJIAND0= github.com/spacemeshos/economics v0.1.4/go.mod h1:6HKWKiKdxjVQcGa2z/wA0LR4M/DzKib856bP16yqNmQ= github.com/spacemeshos/fixed v0.1.2 h1:pENQ8pXFAqin3f15ZLoOVVeSgcmcFJ0IFdFm4+9u4SM= diff --git a/malfeasance/handler.go b/malfeasance/handler.go index b15eb2aa1f..4f44c4b941 100644 --- a/malfeasance/handler.go +++ b/malfeasance/handler.go @@ -77,11 +77,11 @@ func (h *Handler) RegisterHandler(malfeasanceType MalfeasanceType, handler Malfe h.handlers[malfeasanceType] = handler } -func (h *Handler) reportMalfeasance(smesher types.NodeID, proof []byte) { +func (h *Handler) reportMalfeasance(smesher types.NodeID) { h.tortoise.OnMalfeasance(smesher) - events.ReportMalfeasance(smesher, proof) + events.ReportMalfeasance(smesher) if slices.Contains(h.nodeIDs, smesher) { - events.EmitOwnMalfeasanceProof(smesher, proof) + events.EmitOwnMalfeasanceProof(smesher) } } @@ -93,9 +93,14 @@ func (h *Handler) countInvalidProof(p *wire.MalfeasanceProof) { h.handlers[MalfeasanceType(p.Proof.Type)].ReportInvalidProof(numInvalidProofs) } -func (h *Handler) Info(data []byte) (map[string]string, error) { +func (h *Handler) Info(ctx context.Context, nodeID types.NodeID) (map[string]string, error) { + var blob sql.Blob + if err := identities.LoadMalfeasanceBlob(ctx, h.cdb, nodeID.Bytes(), &blob); err != nil { + return nil, fmt.Errorf("load malfeasance proof: %w", err) + } + var p wire.MalfeasanceProof - if err := codec.Decode(data, &p); err != nil { + if err := codec.Decode(blob.Bytes, &p); err != nil { return nil, fmt.Errorf("decode malfeasance proof: %w", err) } mh, ok := h.handlers[MalfeasanceType(p.Proof.Type)] @@ -170,7 +175,7 @@ func (h *Handler) HandleMalfeasanceProof(ctx context.Context, peer p2p.Peer, dat h.countInvalidProof(&p.MalfeasanceProof) return fmt.Errorf("%w: %s", pubsub.ErrValidationReject, err) } - h.reportMalfeasance(id, codec.MustEncode(&p.MalfeasanceProof)) + h.reportMalfeasance(id) // node saves malfeasance proof eagerly/atomically with the malicious data. // it has validated the proof before saving to db. h.countProof(&p.MalfeasanceProof) @@ -219,7 +224,7 @@ func (h *Handler) validateAndSave(ctx context.Context, p *wire.MalfeasanceProof) } return nodeID, err } - h.reportMalfeasance(nodeID, proofBytes) + h.reportMalfeasance(nodeID) h.cdb.CacheMalfeasanceProof(nodeID, proofBytes) h.countProof(p) h.logger.Debug("new malfeasance proof", diff --git a/malfeasance/handler_test.go b/malfeasance/handler_test.go index adb1e5af55..e08affc7de 100644 --- a/malfeasance/handler_test.go +++ b/malfeasance/handler_test.go @@ -20,6 +20,7 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/datastore" "github.com/spacemeshos/go-spacemesh/malfeasance/wire" + "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/identities" @@ -29,13 +30,14 @@ import ( type testMalfeasanceHandler struct { *Handler - db sql.StateDatabase - mockTrt *Mocktortoise + observedLogs *observer.ObservedLogs + db sql.StateDatabase + mockTrt *Mocktortoise } func newHandler(tb testing.TB) *testMalfeasanceHandler { db := statesql.InMemoryTest(tb) - observer, _ := observer.New(zapcore.WarnLevel) + observer, observedLogs := observer.New(zapcore.WarnLevel) logger := zaptest.NewLogger(tb, zaptest.WrapOptions(zap.WrapCore( func(core zapcore.Core) zapcore.Core { return zapcore.NewTee(core, observer) @@ -58,8 +60,9 @@ func newHandler(tb testing.TB) *testMalfeasanceHandler { return &testMalfeasanceHandler{ Handler: h, - db: db, - mockTrt: trt, + observedLogs: observedLogs, + db: db, + mockTrt: trt, } } @@ -253,15 +256,23 @@ func TestHandler_HandleSyncedMalfeasanceProof(t *testing.T) { }, } + expectedHash := types.RandomHash() h.mockTrt.EXPECT().OnMalfeasance(nodeID) err := h.HandleSyncedMalfeasanceProof( context.Background(), - types.RandomHash(), + expectedHash, "peer", codec.MustEncode(proof), ) require.ErrorIs(t, err, errWrongHash) require.ErrorIs(t, err, pubsub.ErrValidationReject) + + require.Equal(t, 1, h.observedLogs.Len()) + log := h.observedLogs.All()[0] + require.Equal(t, zap.WarnLevel, log.Level) + require.Contains(t, log.Message, "malfeasance proof for wrong identity") + require.Equal(t, expectedHash.ShortString(), log.ContextMap()["expected"]) + require.Equal(t, p2p.Peer("peer").String(), log.ContextMap()["peer"]) }) t.Run("invalid proof", func(t *testing.T) { @@ -374,11 +385,12 @@ func TestHandler_HandleSyncedMalfeasanceProof(t *testing.T) { } func TestHandler_Info(t *testing.T) { - t.Run("malformed data", func(t *testing.T) { + t.Run("unknown identity", func(t *testing.T) { h := newHandler(t) - info, err := h.Info(types.RandomBytes(32)) - require.ErrorContains(t, err, "decode malfeasance proof:") + info, err := h.Info(context.Background(), types.RandomNodeID()) + require.ErrorContains(t, err, "load malfeasance proof:") + require.ErrorIs(t, err, sql.ErrNotFound) require.Nil(t, info) }) @@ -392,9 +404,11 @@ func TestHandler_Info(t *testing.T) { Data: &wire.AtxProof{}, }, } + nodeID := types.RandomNodeID() proofBytes := codec.MustEncode(proof) + require.NoError(t, identities.SetMalicious(h.db, nodeID, proofBytes, time.Now())) - info, err := h.Info(proofBytes) + info, err := h.Info(context.Background(), nodeID) require.ErrorContains(t, err, fmt.Sprintf("unknown malfeasance type %d", wire.MultipleATXs)) require.Nil(t, info) }) @@ -414,9 +428,11 @@ func TestHandler_Info(t *testing.T) { Data: &wire.AtxProof{}, }, } + nodeID := types.RandomNodeID() proofBytes := codec.MustEncode(proof) + require.NoError(t, identities.SetMalicious(h.db, nodeID, proofBytes, time.Now())) - info, err := h.Info(proofBytes) + info, err := h.Info(context.Background(), nodeID) require.ErrorContains(t, err, "invalid proof") require.Nil(t, info) }) @@ -440,7 +456,10 @@ func TestHandler_Info(t *testing.T) { Data: &wire.AtxProof{}, }, } + nodeID := types.RandomNodeID() proofBytes := codec.MustEncode(proof) + require.NoError(t, identities.SetMalicious(h.db, nodeID, proofBytes, time.Now())) + expectedProperties := map[string]string{ "domain": "0", "type": strconv.FormatUint(uint64(wire.MultipleATXs), 10), @@ -449,7 +468,7 @@ func TestHandler_Info(t *testing.T) { expectedProperties[k] = v } - info, err := h.Info(proofBytes) + info, err := h.Info(context.Background(), nodeID) require.NoError(t, err) require.Equal(t, expectedProperties, info) }) diff --git a/malfeasance2/handler.go b/malfeasance2/handler.go new file mode 100644 index 0000000000..392299a8a1 --- /dev/null +++ b/malfeasance2/handler.go @@ -0,0 +1,44 @@ +package malfeasance2 + +import ( + "context" + + "go.uber.org/zap" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/signing" + "github.com/spacemeshos/go-spacemesh/sql" +) + +// nolint:unused +type Handler struct { + logger *zap.Logger + db sql.Executor + self p2p.Peer + nodeIDs []types.NodeID + edVerifier *signing.EdVerifier + tortoise tortoise +} + +func NewHandler( + db sql.Executor, + lg *zap.Logger, + self p2p.Peer, + nodeIDs []types.NodeID, + edVerifier *signing.EdVerifier, + tortoise tortoise, +) *Handler { + return &Handler{ + db: db, + logger: lg, + self: self, + nodeIDs: nodeIDs, + edVerifier: edVerifier, + tortoise: tortoise, + } +} + +func (h *Handler) Info(ctx context.Context, nodeID types.NodeID) (map[string]string, error) { + return nil, sql.ErrNotFound +} diff --git a/malfeasance2/interface.go b/malfeasance2/interface.go new file mode 100644 index 0000000000..12bd87e98a --- /dev/null +++ b/malfeasance2/interface.go @@ -0,0 +1,11 @@ +package malfeasance2 + +import ( + "github.com/spacemeshos/go-spacemesh/common/types" +) + +//go:generate mockgen -typed -package=malfeasance2 -destination=./mocks.go -source=./interface.go + +type tortoise interface { + OnMalfeasance(types.NodeID) +} diff --git a/malfeasance2/mocks.go b/malfeasance2/mocks.go new file mode 100644 index 0000000000..c1c68d418e --- /dev/null +++ b/malfeasance2/mocks.go @@ -0,0 +1,77 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./interface.go +// +// Generated by this command: +// +// mockgen -typed -package=malfeasance2 -destination=./mocks.go -source=./interface.go +// + +// Package malfeasance2 is a generated GoMock package. +package malfeasance2 + +import ( + reflect "reflect" + + types "github.com/spacemeshos/go-spacemesh/common/types" + gomock "go.uber.org/mock/gomock" +) + +// Mocktortoise is a mock of tortoise interface. +type Mocktortoise struct { + ctrl *gomock.Controller + recorder *MocktortoiseMockRecorder + isgomock struct{} +} + +// MocktortoiseMockRecorder is the mock recorder for Mocktortoise. +type MocktortoiseMockRecorder struct { + mock *Mocktortoise +} + +// NewMocktortoise creates a new mock instance. +func NewMocktortoise(ctrl *gomock.Controller) *Mocktortoise { + mock := &Mocktortoise{ctrl: ctrl} + mock.recorder = &MocktortoiseMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mocktortoise) EXPECT() *MocktortoiseMockRecorder { + return m.recorder +} + +// OnMalfeasance mocks base method. +func (m *Mocktortoise) OnMalfeasance(arg0 types.NodeID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnMalfeasance", arg0) +} + +// OnMalfeasance indicates an expected call of OnMalfeasance. +func (mr *MocktortoiseMockRecorder) OnMalfeasance(arg0 any) *MocktortoiseOnMalfeasanceCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMalfeasance", reflect.TypeOf((*Mocktortoise)(nil).OnMalfeasance), arg0) + return &MocktortoiseOnMalfeasanceCall{Call: call} +} + +// MocktortoiseOnMalfeasanceCall wrap *gomock.Call +type MocktortoiseOnMalfeasanceCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MocktortoiseOnMalfeasanceCall) Return() *MocktortoiseOnMalfeasanceCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MocktortoiseOnMalfeasanceCall) Do(f func(types.NodeID)) *MocktortoiseOnMalfeasanceCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MocktortoiseOnMalfeasanceCall) DoAndReturn(f func(types.NodeID)) *MocktortoiseOnMalfeasanceCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/node/node.go b/node/node.go index f8c88b015b..ab1c64c9af 100644 --- a/node/node.go +++ b/node/node.go @@ -61,6 +61,7 @@ import ( "github.com/spacemeshos/go-spacemesh/layerpatrol" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/malfeasance" + "github.com/spacemeshos/go-spacemesh/malfeasance2" "github.com/spacemeshos/go-spacemesh/mesh" "github.com/spacemeshos/go-spacemesh/metrics" "github.com/spacemeshos/go-spacemesh/metrics/public" @@ -378,50 +379,51 @@ func New(opts ...Option) *App { // App is the cli app singleton. type App struct { *cobra.Command - fileLock *flock.Flock - signers []*signing.EdSigner - Config *config.Config - db sql.StateDatabase - apiDB sql.StateDatabase - cachedDB *datastore.CachedDB - dbMetrics *dbmetrics.DBMetricsCollector - localDB sql.LocalDatabase - grpcPublicServer *grpcserver.Server - grpcPrivateServer *grpcserver.Server - grpcPostServer *grpcserver.Server - grpcTLSServer *grpcserver.Server - jsonAPIServer *grpcserver.JSONHTTPServer - grpcServices map[grpcserver.Service]grpcserver.ServiceAPI - pprofService *http.Server - profilerService *pyroscope.Profiler - syncer *syncer.Syncer - proposalBuilder *miner.ProposalBuilder - mesh *mesh.Mesh - atxsdata *atxsdata.Data - clock *timesync.NodeClock - hare3 *hare3.Hare - hare4 *hare4.Hare - hareResultsChan chan hare4.ConsensusOutput - hOracle *eligibility.Oracle - blockGen *blocks.Generator - certifier *blocks.Certifier - atxBuilder *activation.Builder - atxHandler *activation.Handler - txHandler *txs.TxHandler - validator *activation.Validator - edVerifier *signing.EdVerifier - beaconProtocol *beacon.ProtocolDriver - log log.Log - syncLogger log.Log - conState *txs.ConservativeState - fetcher *fetch.Fetch - ptimesync *peersync.Sync - updater *bootstrap.Updater - poetDb *activation.PoetDb - postVerifier activation.PostVerifier - postSupervisor *activation.PostSupervisor - malfeasanceHandler *malfeasance.Handler - errCh chan error + fileLock *flock.Flock + signers []*signing.EdSigner + Config *config.Config + db sql.StateDatabase + apiDB sql.StateDatabase + cachedDB *datastore.CachedDB + dbMetrics *dbmetrics.DBMetricsCollector + localDB sql.LocalDatabase + grpcPublicServer *grpcserver.Server + grpcPrivateServer *grpcserver.Server + grpcPostServer *grpcserver.Server + grpcTLSServer *grpcserver.Server + jsonAPIServer *grpcserver.JSONHTTPServer + grpcServices map[grpcserver.Service]grpcserver.ServiceAPI + pprofService *http.Server + profilerService *pyroscope.Profiler + syncer *syncer.Syncer + proposalBuilder *miner.ProposalBuilder + mesh *mesh.Mesh + atxsdata *atxsdata.Data + clock *timesync.NodeClock + hare3 *hare3.Hare + hare4 *hare4.Hare + hareResultsChan chan hare4.ConsensusOutput + hOracle *eligibility.Oracle + blockGen *blocks.Generator + certifier *blocks.Certifier + atxBuilder *activation.Builder + atxHandler *activation.Handler + txHandler *txs.TxHandler + validator *activation.Validator + edVerifier *signing.EdVerifier + beaconProtocol *beacon.ProtocolDriver + log log.Log + syncLogger log.Log + conState *txs.ConservativeState + fetcher *fetch.Fetch + ptimesync *peersync.Sync + updater *bootstrap.Updater + poetDb *activation.PoetDb + postVerifier activation.PostVerifier + postSupervisor *activation.PostSupervisor + malfeasanceHandler *malfeasance.Handler + malfeasance2Handler *malfeasance2.Handler + errCh chan error host *p2p.Host @@ -1161,18 +1163,27 @@ func (app *App) initServices(ctx context.Context) error { for _, s := range app.signers { nodeIDs = append(nodeIDs, s.NodeID()) } - app.malfeasanceHandler = malfeasance.NewHandler( + malHandler := malfeasance.NewHandler( app.cachedDB, malfeasanceLogger, app.host.ID(), nodeIDs, trtl, ) - app.malfeasanceHandler.RegisterHandler(malfeasance.MultipleATXs, activationMH) - app.malfeasanceHandler.RegisterHandler(malfeasance.MultipleBallots, meshMH) - app.malfeasanceHandler.RegisterHandler(malfeasance.HareEquivocation, hareMH) - app.malfeasanceHandler.RegisterHandler(malfeasance.InvalidPostIndex, invalidPostMH) - app.malfeasanceHandler.RegisterHandler(malfeasance.InvalidPrevATX, invalidPrevMH) + malHandler.RegisterHandler(malfeasance.MultipleATXs, activationMH) + malHandler.RegisterHandler(malfeasance.MultipleBallots, meshMH) + malHandler.RegisterHandler(malfeasance.HareEquivocation, hareMH) + malHandler.RegisterHandler(malfeasance.InvalidPostIndex, invalidPostMH) + malHandler.RegisterHandler(malfeasance.InvalidPrevATX, invalidPrevMH) + + malHandler2 := malfeasance2.NewHandler( + app.cachedDB, + malfeasanceLogger, + app.host.ID(), + nodeIDs, + app.edVerifier, + trtl, + ) fetcher.SetValidators( fetch.ValidatorFunc( @@ -1217,7 +1228,7 @@ func (app *App) initServices(ctx context.Context) error { ), fetch.ValidatorFunc( pubsub.DropPeerOnSyncValidationReject( - app.malfeasanceHandler.HandleSyncedMalfeasanceProof, + malHandler.HandleSyncedMalfeasanceProof, app.host, lg.Zap(), ), @@ -1278,7 +1289,7 @@ func (app *App) initServices(ctx context.Context) error { ) app.host.Register( pubsub.MalfeasanceProof, - pubsub.ChainGossipHandler(checkAtxSynced, app.malfeasanceHandler.HandleMalfeasanceProof), + pubsub.ChainGossipHandler(checkAtxSynced, malHandler.HandleMalfeasanceProof), ) app.proposalBuilder = proposalBuilder @@ -1286,6 +1297,8 @@ func (app *App) initServices(ctx context.Context) error { app.syncer = syncer app.atxBuilder = atxBuilder app.atxHandler = atxHandler + app.malfeasanceHandler = malHandler + app.malfeasance2Handler = malHandler2 app.poetDb = poetDb app.fetcher = fetcher app.beaconProtocol = beaconProtocol @@ -1573,11 +1586,11 @@ func (app *App) grpcService(svc grpcserver.Service, lg log.Log) (grpcserver.Serv app.grpcServices[svc] = service return service, nil case v2alpha1.Malfeasance: - service := v2alpha1.NewMalfeasanceService(app.apiDB, app.malfeasanceHandler) + service := v2alpha1.NewMalfeasanceService(app.apiDB, app.malfeasance2Handler, app.malfeasanceHandler) app.grpcServices[svc] = service return service, nil case v2alpha1.MalfeasanceStream: - service := v2alpha1.NewMalfeasanceStreamService(app.apiDB, app.malfeasanceHandler) + service := v2alpha1.NewMalfeasanceStreamService(app.apiDB, app.malfeasance2Handler, app.malfeasanceHandler) app.grpcServices[svc] = service return service, nil case v2alpha1.Network: diff --git a/sql/identities/identities.go b/sql/identities/identities.go index d0659fd982..99964b5de9 100644 --- a/sql/identities/identities.go +++ b/sql/identities/identities.go @@ -96,3 +96,16 @@ func AllMalicious(db sql.Executor) ([]types.NodeID, error) { } return ids, nil } + +// CountMalicious returns the number of malicious nodes. +func CountMalicious(db sql.Executor) (uint64, error) { + var count uint64 + _, err := db.Exec(` + SELECT COUNT(*) + FROM identities + `, nil, func(stmt *sql.Statement) bool { + count = uint64(stmt.ColumnInt64(0)) + return false + }) + return count, err +} diff --git a/sql/identities/identities_test.go b/sql/identities/identities_test.go index 8e357bc04f..06c7a60d73 100644 --- a/sql/identities/identities_test.go +++ b/sql/identities/identities_test.go @@ -70,20 +70,35 @@ func Test_GetMalicious(t *testing.T) { for i := 0; i < numBad; i++ { nid := types.NodeID{byte(i + 1)} bad = append(bad, nid) - require.NoError(t, identities.SetMalicious(db, nid, types.RandomBytes(11), time.Now().Local())) + require.NoError(t, identities.SetMalicious(db, nid, types.RandomBytes(11), time.Now())) } got, err = identities.AllMalicious(db) require.NoError(t, err) require.Equal(t, bad, got) } +func Test_CountMalicious(t *testing.T) { + db := statesql.InMemoryTest(t) + got, err := identities.CountMalicious(db) + require.NoError(t, err) + require.Zero(t, got) + + const numBad = uint64(11) + for range numBad { + require.NoError(t, identities.SetMalicious(db, types.RandomNodeID(), types.RandomBytes(11), time.Now())) + } + got, err = identities.CountMalicious(db) + require.NoError(t, err) + require.Equal(t, numBad, got) +} + func TestLoadMalfeasanceBlob(t *testing.T) { db := statesql.InMemoryTest(t) ctx := context.Background() nid1 := types.RandomNodeID() proof1 := types.RandomBytes(11) - identities.SetMalicious(db, nid1, proof1, time.Now().Local()) + require.NoError(t, identities.SetMalicious(db, nid1, proof1, time.Now())) var blob1 sql.Blob require.NoError(t, identities.LoadMalfeasanceBlob(ctx, db, nid1.Bytes(), &blob1)) @@ -95,7 +110,7 @@ func TestLoadMalfeasanceBlob(t *testing.T) { nid2 := types.RandomNodeID() proof2 := types.RandomBytes(12) - identities.SetMalicious(db, nid2, proof2, time.Now().Local()) + require.NoError(t, identities.SetMalicious(db, nid2, proof2, time.Now())) var blob2 sql.Blob require.NoError(t, identities.LoadMalfeasanceBlob(ctx, db, nid2.Bytes(), &blob2)) diff --git a/sql/malfeasance/malfeasance.go b/sql/malfeasance/malfeasance.go index c45c47895d..416743e153 100644 --- a/sql/malfeasance/malfeasance.go +++ b/sql/malfeasance/malfeasance.go @@ -6,23 +6,10 @@ import ( "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/builder" "github.com/spacemeshos/go-spacemesh/sql/marriage" ) -func IsMalicious(db sql.Executor, nodeID types.NodeID) (bool, error) { - rows, err := db.Exec(` - SELECT 1 - FROM malfeasance - WHERE pubkey = ?1 - `, func(stmt *sql.Statement) { - stmt.BindBytes(1, nodeID.Bytes()) - }, nil) - if err != nil { - return false, fmt.Errorf("is malicious %v: %w", nodeID, err) - } - return rows > 0, nil -} - func AddProof( db sql.Executor, nodeID types.NodeID, @@ -68,3 +55,43 @@ func SetMalicious(db sql.Executor, nodeID types.NodeID, marriageID marriage.ID, } return nil } + +func IsMalicious(db sql.Executor, nodeID types.NodeID) (bool, error) { + rows, err := db.Exec(` + SELECT 1 + FROM malfeasance + WHERE pubkey = ?1 + `, func(stmt *sql.Statement) { + stmt.BindBytes(1, nodeID.Bytes()) + }, nil) + if err != nil { + return false, fmt.Errorf("is malicious %v: %w", nodeID, err) + } + return rows > 0, nil +} + +func IterateOps( + db sql.Executor, + operations builder.Operations, + fn func(types.NodeID, []byte, int, time.Time) bool, +) error { + fullQuery := ` + SELECT pubkey, proof, domain, received + FROM malfeasance + ` + builder.FilterFrom(operations) + _, err := db.Exec(fullQuery, builder.BindingsFrom(operations), + func(stmt *sql.Statement) bool { + var id types.NodeID + stmt.ColumnBytes(0, id[:]) + var proof []byte + if stmt.ColumnLen(1) > 0 { + proof = make([]byte, stmt.ColumnLen(1)) + stmt.ColumnBytes(1, proof) + } + domain := int(stmt.ColumnInt64(2)) + received := time.Unix(0, stmt.ColumnInt64(3)) + return fn(id, proof, domain, received) + }, + ) + return err +} diff --git a/sql/malfeasance/malfeasance_test.go b/sql/malfeasance/malfeasance_test.go index 65bc13f5ab..e191bca7e6 100644 --- a/sql/malfeasance/malfeasance_test.go +++ b/sql/malfeasance/malfeasance_test.go @@ -1,12 +1,14 @@ package malfeasance_test import ( + "math/rand/v2" "testing" "time" "github.com/stretchr/testify/require" "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql/builder" "github.com/spacemeshos/go-spacemesh/sql/malfeasance" "github.com/spacemeshos/go-spacemesh/sql/marriage" "github.com/spacemeshos/go-spacemesh/sql/statesql" @@ -19,7 +21,7 @@ func TestAdd(t *testing.T) { t.Parallel() db := statesql.InMemoryTest(t) - err := malfeasance.AddProof(db, types.RandomNodeID(), nil, nil, 0, time.Now()) + err := malfeasance.AddProof(db, types.RandomNodeID(), nil, nil, 1, time.Now()) require.Error(t, err) }) @@ -27,7 +29,7 @@ func TestAdd(t *testing.T) { t.Parallel() db := statesql.InMemoryTest(t) - err := malfeasance.AddProof(db, types.RandomNodeID(), nil, types.RandomBytes(100), 0, time.Now()) + err := malfeasance.AddProof(db, types.RandomNodeID(), nil, types.RandomBytes(100), 1, time.Now()) require.NoError(t, err) }) @@ -36,7 +38,7 @@ func TestAdd(t *testing.T) { db := statesql.InMemoryTest(t) id := marriage.ID(100) - err := malfeasance.AddProof(db, types.RandomNodeID(), &id, nil, 0, time.Now()) + err := malfeasance.AddProof(db, types.RandomNodeID(), &id, nil, 1, time.Now()) require.Error(t, err) }) @@ -58,7 +60,7 @@ func TestAdd(t *testing.T) { }) require.NoError(t, err) - err = malfeasance.AddProof(db, types.RandomNodeID(), &id, nil, 0, time.Now()) + err = malfeasance.AddProof(db, types.RandomNodeID(), &id, nil, 1, time.Now()) require.NoError(t, err) }) @@ -80,37 +82,11 @@ func TestAdd(t *testing.T) { }) require.NoError(t, err) - err = malfeasance.AddProof(db, types.RandomNodeID(), &id, types.RandomBytes(100), 0, time.Now()) + err = malfeasance.AddProof(db, types.RandomNodeID(), &id, types.RandomBytes(100), 1, time.Now()) require.NoError(t, err) }) } -func TestIsMalicious(t *testing.T) { - t.Parallel() - - t.Run("unknown node is not malicious", func(t *testing.T) { - t.Parallel() - db := statesql.InMemoryTest(t) - - mal, err := malfeasance.IsMalicious(db, types.RandomNodeID()) - require.NoError(t, err) - require.False(t, mal) - }) - - t.Run("known node is malicious", func(t *testing.T) { - t.Parallel() - db := statesql.InMemoryTest(t) - - nodeID := types.RandomNodeID() - err := malfeasance.AddProof(db, nodeID, nil, types.RandomBytes(100), 0, time.Now()) - require.NoError(t, err) - - mal, err := malfeasance.IsMalicious(db, nodeID) - require.NoError(t, err) - require.True(t, mal) - }) -} - func TestSetMalicious(t *testing.T) { t.Parallel() @@ -158,7 +134,7 @@ func TestSetMalicious(t *testing.T) { db := statesql.InMemoryTest(t) nodeID := types.RandomNodeID() - err := malfeasance.AddProof(db, nodeID, nil, types.RandomBytes(100), 0, time.Now()) + err := malfeasance.AddProof(db, nodeID, nil, types.RandomBytes(100), 1, time.Now()) require.NoError(t, err) err = malfeasance.SetMalicious(db, nodeID, marriage.ID(0), time.Now()) @@ -177,7 +153,7 @@ func TestSetMalicious(t *testing.T) { require.NoError(t, err) nodeID := types.RandomNodeID() - err = malfeasance.AddProof(db, nodeID, nil, types.RandomBytes(100), 0, time.Now()) + err = malfeasance.AddProof(db, nodeID, nil, types.RandomBytes(100), 1, time.Now()) require.NoError(t, err) err = marriage.Add(db, marriage.Info{ @@ -198,3 +174,135 @@ func TestSetMalicious(t *testing.T) { require.True(t, mal) }) } + +func TestIsMalicious(t *testing.T) { + t.Parallel() + + t.Run("unknown node is not malicious", func(t *testing.T) { + t.Parallel() + db := statesql.InMemoryTest(t) + + mal, err := malfeasance.IsMalicious(db, types.RandomNodeID()) + require.NoError(t, err) + require.False(t, mal) + }) + + t.Run("known node is malicious", func(t *testing.T) { + t.Parallel() + db := statesql.InMemoryTest(t) + + nodeID := types.RandomNodeID() + err := malfeasance.AddProof(db, nodeID, nil, types.RandomBytes(100), 1, time.Now()) + require.NoError(t, err) + + mal, err := malfeasance.IsMalicious(db, nodeID) + require.NoError(t, err) + require.True(t, mal) + }) +} + +func Test_IterateMaliciousOps(t *testing.T) { + db := statesql.InMemoryTest(t) + tt := []struct { + id types.NodeID + proof []byte + domain int + }{ + { + types.RandomNodeID(), + types.RandomBytes(11), + rand.IntN(255), + }, + { + types.RandomNodeID(), + types.RandomBytes(11), + rand.IntN(255), + }, + { + types.RandomNodeID(), + types.RandomBytes(11), + rand.IntN(255), + }, + } + + for _, tc := range tt { + err := malfeasance.AddProof(db, tc.id, nil, tc.proof, tc.domain, time.Now()) + require.NoError(t, err) + } + + var got []struct { + id types.NodeID + proof []byte + domain int + } + err := malfeasance.IterateOps(db, builder.Operations{}, + func(id types.NodeID, proof []byte, domain int, _ time.Time) bool { + got = append(got, struct { + id types.NodeID + proof []byte + domain int + }{id, proof, domain}) + return true + }) + require.NoError(t, err) + require.ElementsMatch(t, tt, got) +} + +func Test_IterateMaliciousOps_Married(t *testing.T) { + db := statesql.InMemoryTest(t) + + nodeID := types.RandomNodeID() + marriageATX := types.RandomATXID() + id, err := marriage.NewID(db) + require.NoError(t, err) + + err = marriage.Add(db, marriage.Info{ + ID: id, + NodeID: nodeID, + ATX: marriageATX, + MarriageIndex: 0, + Target: nodeID, + Signature: types.RandomEdSignature(), + }) + require.NoError(t, err) + + ids := make([]types.NodeID, 5) + ids[0] = nodeID + proof := types.RandomBytes(11) + err = malfeasance.AddProof(db, ids[0], &id, proof, 1, time.Now()) + require.NoError(t, err) + + for i := 1; i < len(ids); i++ { + ids[i] = types.RandomNodeID() + err := malfeasance.SetMalicious(db, ids[i], id, time.Now()) + require.NoError(t, err) + } + + var got []struct { + id types.NodeID + proof []byte + domain int + } + err = malfeasance.IterateOps(db, builder.Operations{}, + func(id types.NodeID, proof []byte, domain int, _ time.Time) bool { + got = append(got, struct { + id types.NodeID + proof []byte + domain int + }{id, proof, domain}) + return true + }) + require.NoError(t, err) + require.Equal(t, len(ids), len(got)) + require.Equal(t, ids[0], got[0].id) + require.Equal(t, proof, got[0].proof) + require.Equal(t, 1, got[0].domain) + + ids = ids[1:] + got = got[1:] + for i, id := range ids { + require.Equal(t, id, got[i].id) + require.Nil(t, got[i].proof) + require.Zero(t, got[i].domain) + } +}