Skip to content

Commit

Permalink
fix malfeasance proof metrics and add logging (#4068)
Browse files Browse the repository at this point in the history
## Motivation
<!-- Please mention the issue fixed by this PR or detailed motivation -->
part of #4067
<!-- `Closes #XXXX, closes #XXXX, ...` links mentioned issues to this PR and automatically closes them when this it's merged -->

## Changes
<!-- Please describe in detail the changes made -->
- malfeasance proof: only update metrics when save is successful
- add more logging to malfeasance proof upon generation and reception
- make atx handler uses just 1 mutex, not 2
- rename all miner_id, atx_node_id, sender_Id logging field to just smesher
- fix benchmark tests in atx handler
  • Loading branch information
countvonzero committed Feb 19, 2023
1 parent f1608a1 commit 8336a0f
Show file tree
Hide file tree
Showing 24 changed files with 291 additions and 199 deletions.
4 changes: 4 additions & 0 deletions Makefile-gpu.Inc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ $(BINDIR_GPU_SETUP_LIBS): $(PROJ_DIR)$(GPU_SETUP_ZIP)
$(PROJ_DIR)$(GPU_SETUP_ZIP):
curl -L $(GPU_SETUP_URL_ZIP) -o $(PROJ_DIR)$(GPU_SETUP_ZIP)

go-env-test: get-gpu-setup
go env -w CGO_LDFLAGS="$(CGO_TEST_LDFLAGS)"
.PHONY: go-env-test

get-gpu-setup: $(PROJ_DIR)$(GPU_SETUP_ZIP) $(BINDIR_GPU_SETUP_LIBS)
.PHONY: get-gpu-setup

Expand Down
4 changes: 0 additions & 4 deletions Makefile-svm.Inc
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ go-env: get-gpu-setup
go env -w CGO_LDFLAGS="$(CGO_LDFLAGS)"
.PHONY: go-env

go-env-test: get-gpu-setup
go env -w CGO_LDFLAGS="$(CGO_TEST_LDFLAGS)"
.PHONY: go-env-test

print-ldflags: get-gpu-setup
@echo $(CGO_LDFLAGS)
.PHONY: print-ldflags
Expand Down
43 changes: 20 additions & 23 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ type atxChan struct {

// Handler processes the atxs received from all nodes and their validity status.
type Handler struct {
sync.RWMutex

cdb *datastore.CachedDB
clock layerClock
publisher pubsub.Publisher
Expand All @@ -45,7 +43,7 @@ type Handler struct {
nipostValidator nipostValidator
atxReceivers []AtxReceiver
log log.Log
processAtxMutex sync.Mutex
mu sync.Mutex
atxChannels map[types.ATXID]*atxChan
fetcher system.Fetcher
}
Expand Down Expand Up @@ -86,8 +84,8 @@ func init() {

// AwaitAtx returns a channel that will receive notification when the specified atx with id is received via gossip.
func (h *Handler) AwaitAtx(id types.ATXID) chan struct{} {
h.Lock()
defer h.Unlock()
h.mu.Lock()
defer h.mu.Unlock()

if has, err := atxs.Has(h.cdb, id); err == nil && has {
return closedChan
Expand All @@ -107,8 +105,8 @@ func (h *Handler) AwaitAtx(id types.ATXID) chan struct{} {

// UnsubscribeAtx un subscribes the waiting for a specific atx with atx id id to arrive via gossip.
func (h *Handler) UnsubscribeAtx(id types.ATXID) {
h.Lock()
defer h.Unlock()
h.mu.Lock()
defer h.mu.Unlock()

ch, found := h.atxChannels[id]
if !found {
Expand All @@ -125,8 +123,8 @@ func (h *Handler) UnsubscribeAtx(id types.ATXID) {
//
// ATXs received as input must be already syntactically valid. Only contextual validation is performed.
func (h *Handler) ProcessAtx(ctx context.Context, atx *types.VerifiedActivationTx) error {
h.processAtxMutex.Lock()
defer h.processAtxMutex.Unlock()
h.mu.Lock()
defer h.mu.Unlock()

existingATX, _ := h.cdb.GetAtxHeader(atx.ID())
if existingATX != nil { // Already processed
Expand All @@ -135,19 +133,19 @@ func (h *Handler) ProcessAtx(ctx context.Context, atx *types.VerifiedActivationT
h.log.WithContext(ctx).With().Info("processing atx",
atx.ID(),
atx.PublishEpoch(),
log.FieldNamed("atx_node_id", atx.NodeID()),
log.FieldNamed("smesher", atx.NodeID()),
atx.PubLayerID,
)
if err := h.ContextuallyValidateAtx(atx); err != nil {
h.log.WithContext(ctx).With().Warning("atx failed contextual validation",
atx.ID(),
log.FieldNamed("atx_node_id", atx.NodeID()),
log.FieldNamed("smesher", atx.NodeID()),
log.Err(err),
)
} else {
h.log.WithContext(ctx).With().Info("atx is valid", atx.ID())
}
if err := h.StoreAtx(ctx, atx); err != nil {
if err := h.storeAtx(ctx, atx); err != nil {
return fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err)
}

Expand Down Expand Up @@ -253,7 +251,7 @@ func (h *Handler) validateNonInitialAtx(ctx context.Context, atx *types.Activati
if atx.NumUnits > prevAtx.NumUnits && nonce == nil {
h.log.WithContext(ctx).With().Info("PoST size increased without new VRF Nonce, re-validating current nonce",
atx.ID(),
log.FieldNamed("atx_node_id", atx.NodeID()),
log.FieldNamed("smesher", atx.NodeID()),
)

current, err := h.cdb.VRFNonce(atx.NodeID(), atx.TargetEpoch())
Expand Down Expand Up @@ -309,7 +307,7 @@ func (h *Handler) ContextuallyValidateAtx(atx *types.VerifiedActivationTx) error

if err == nil && atx.PrevATXID == *types.EmptyATXID {
// no previous atx declared, but already seen at least one atx from node
return fmt.Errorf("no prevATX reported, but other atx with same nodeID (%v) found: %v", atx.NodeID().ShortString(), lastAtx.ShortString())
return fmt.Errorf("no prevATX reported, but other atx with same nodeID (%v) found: %v", atx.NodeID(), lastAtx.ShortString())
}

if err == nil && atx.PrevATXID != lastAtx {
Expand All @@ -326,7 +324,7 @@ func (h *Handler) ContextuallyValidateAtx(atx *types.VerifiedActivationTx) error
// no previous atx found but previous atx referenced
h.log.With().Error("could not fetch node last atx",
atx.ID(),
log.FieldNamed("atx_node_id", atx.NodeID()),
log.FieldNamed("smesher", atx.NodeID()),
log.Err(err),
)
return fmt.Errorf("could not fetch node last atx: %w", err)
Expand All @@ -335,11 +333,8 @@ func (h *Handler) ContextuallyValidateAtx(atx *types.VerifiedActivationTx) error
return err
}

// StoreAtx stores an ATX and notifies subscribers of the ATXID.
func (h *Handler) StoreAtx(ctx context.Context, atx *types.VerifiedActivationTx) error {
h.Lock()
defer h.Unlock()

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx) error {
malicious, err := h.cdb.IsMalicious(atx.NodeID())
if err != nil {
return fmt.Errorf("checking if node is malicious: %w", err)
Expand All @@ -351,7 +346,8 @@ func (h *Handler) StoreAtx(ctx context.Context, atx *types.VerifiedActivationTx)
if err != nil && !errors.Is(err, sql.ErrNotFound) {
return err
}
if prev != nil {
// do ID check to be absolutely sure.
if prev != nil && prev.ID() != atx.ID() {
var atxProof types.AtxProof
for i, a := range []*types.VerifiedActivationTx{prev, atx} {
atxProof.Messages[i] = types.AtxProofMsg{
Expand All @@ -371,7 +367,8 @@ func (h *Handler) StoreAtx(ctx context.Context, atx *types.VerifiedActivationTx)
}
h.log.With().Warning("smesher produced more than one atx in the same epoch",
log.Stringer("smesher", atx.NodeID()),
log.Inline(atx),
log.Object("prev", prev),
log.Object("curr", atx),
)
}
}
Expand Down Expand Up @@ -517,7 +514,7 @@ func (h *Handler) handleAtxData(ctx context.Context, peer p2p.Peer, data []byte)
r.OnAtx(header)
}
events.ReportNewActivation(vAtx)
logger.With().Info("new atx", log.Inline(atx), log.Int("size", len(data)))
logger.With().Info("new atx", log.Inline(vAtx), log.Int("size", len(data)))
return nil
}

Expand Down
Loading

0 comments on commit 8336a0f

Please sign in to comment.