-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BCI-3492 [LogPoller]: Allow withObservedExecAndRowsAffected to report non-zero rows affected #14057
Changes from all commits
0153d63
acccf34
32b86ec
e98677b
ee0faa5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"chainlink": minor | ||
--- | ||
|
||
#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ import ( | |
"github.com/smartcontractkit/chainlink-common/pkg/types/query" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil" | ||
|
||
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" | ||
|
||
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" | ||
|
@@ -313,34 +314,29 @@ type Exp struct { | |
ShouldDelete bool | ||
} | ||
|
||
// DeleteExpiredLogs removes any logs which either: | ||
// - don't match any currently registered filters, or | ||
// - have a timestamp older than any matching filter's retention, UNLESS there is at | ||
// least one matching filter with retention=0 | ||
func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to accidentally remove logs when job is updated? Let's imagine the following scenario
I wonder if this is something that could happen or impact the system. How does it work for other products? What if job is removed and added in a two way process? (via API). AFAIK CLO does this entire flow in a large single db transaction, but I guess it might change at some point because of the loopps architecture, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question! I'm pretty sure it's okay for now, since as you say CLO does the whole flow in a single db transaction. Someone could manually remove a job, and then add a similar one later... but I think it would be a natural user expectation to assume that there are no guarantees about events the old job cared about still being there for the new job to use. If CLO changes so that it's no longer in a single db transaction, that seems more problematic since it's presented to the user as an "Update" rather than a "Remove" followed by a separate "Create". But I think this would cause similar problems with or without this change. One way or another, we'll have to solve some issues if we can no longer guarantee the ability for CLO to make atomic job updates.
Any plugin which cares about past logs already does a replay to the appropriate block when it's initialized. (For example, going back to pick up an OCR2 configuration event.) So I think the only issue is with missing logs that get emitted around the time the old job is getting swapped out with the new job. But even without this change, there is a potential for missing some logs while that's happening. This would just increase the likelihood of it happening, since it would apply not just to events emitted between the delete and create operations but also those emitted shortly before the delete where the old job hadn't had time to process them before being deleted. One way I can think of for how we might be able to solve this issue is with ChainReader. If it can expose a method for updating a filter that does UnregisterFilter and then RegisterFilter within the same db transaction, that should solve it I think. We might already even be doing this with Bind(), I'm not sure but @ilija42 should know. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. One thing that comes to my mind that might work as an additional safety check is to wait for some "sane" threshold before deleting logs without the filter. When There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I was thinking that might be what we have to do, if we can't solve things with ChainReader. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I just thought of a simple solution which I think would be pretty robust: in |
||
var err error | ||
var result sql.Result | ||
if limit > 0 { | ||
result, err = o.ds.ExecContext(ctx, ` | ||
DELETE FROM evm.logs | ||
query := `DELETE FROM evm.logs | ||
WHERE (evm_chain_id, address, event_sig, block_number) IN ( | ||
SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number | ||
FROM evm.logs l | ||
INNER JOIN ( | ||
SELECT address, event, MAX(retention) AS retention | ||
LEFT JOIN ( | ||
SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention | ||
FROM evm.log_poller_filters | ||
WHERE evm_chain_id = $1 | ||
GROUP BY evm_chain_id, address, event | ||
HAVING NOT 0 = ANY(ARRAY_AGG(retention)) | ||
) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event | ||
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second') | ||
LIMIT $2 | ||
)`, ubig.New(o.chainID), limit) | ||
WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)` | ||
|
||
if limit > 0 { | ||
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit) | ||
} else { | ||
result, err = o.ds.ExecContext(ctx, `WITH r AS | ||
( SELECT address, event, MAX(retention) AS retention | ||
FROM evm.log_poller_filters WHERE evm_chain_id=$1 | ||
GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention)) | ||
) DELETE FROM evm.logs l USING r | ||
WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event | ||
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT) | ||
ubig.New(o.chainID)) | ||
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID)) | ||
} | ||
|
||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!