diff --git a/harmony/harmonydb/sql/20240823-ipni.sql b/harmony/harmonydb/sql/20240823-ipni.sql index e2020e284..5a1505868 100644 --- a/harmony/harmonydb/sql/20240823-ipni.sql +++ b/harmony/harmonydb/sql/20240823-ipni.sql @@ -12,6 +12,9 @@ CREATE TABLE ipni ( -- metadata column in not required as Curio only supports one type of metadata(HTTP) is_rm BOOLEAN NOT NULL, + -- skip added in 20241106-market-fixes.sql + -- is_skip BOOLEAN NOT NULL DEFAULT FALSE, -- set to true means return 404 for related entries + previous TEXT, -- previous ad will only be null for first ad in chain provider TEXT NOT NULL, -- peerID from libp2p, this is main identifier on IPNI side @@ -33,7 +36,10 @@ CREATE INDEX ipni_provider_order_number ON ipni(provider, order_number); CREATE UNIQUE INDEX ipni_ad_cid ON ipni(ad_cid); -- This index will speed up lookups based on the ad_cid, which is frequently used to identify specific ads -CREATE UNIQUE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm); +CREATE UNIQUE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm); -- dropped in 20241106-market-fixes.sql +-- 20241106-market-fixes.sql: +-- CREATE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm, is_skip) -- non-unique to allow multiple skips +-- CREATE INDEX ipni_entries_skip ON ipni(entries, is_skip, piece_cid); -- Since the get_ad_chain function relies on both provider and ad_cid to find the order_number, this index will optimize that query: CREATE INDEX ipni_provider_ad_cid ON ipni(provider, ad_cid); diff --git a/harmony/harmonydb/sql/20241106-market-fixes.sql b/harmony/harmonydb/sql/20241106-market-fixes.sql index 8b4f9f834..f98ca4592 100644 --- a/harmony/harmonydb/sql/20241106-market-fixes.sql +++ b/harmony/harmonydb/sql/20241106-market-fixes.sql @@ -10,4 +10,14 @@ alter table market_mk12_deals add proposal_cid text not null; CREATE INDEX market_mk12_deals_proposal_cid_index - ON market_mk12_deals (proposal_cid); \ No newline at end of file + ON market_mk12_deals (proposal_cid); + + +-- Add the is_skip column to the ipni table +ALTER TABLE ipni + ADD COLUMN is_skip BOOLEAN NOT NULL DEFAULT FALSE; -- set to true means return 404 for related entries + +DROP INDEX IF EXISTS ipni_context_id; +CREATE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm, is_skip); + +CREATE INDEX ipni_entries_skip ON ipni(entries, is_skip, piece_cid); \ No newline at end of file diff --git a/market/ipni/chunker/serve-chunker.go b/market/ipni/chunker/serve-chunker.go index 351d2624d..1cb0655f8 100644 --- a/market/ipni/chunker/serve-chunker.go +++ b/market/ipni/chunker/serve-chunker.go @@ -6,6 +6,7 @@ import ( "context" "encoding/hex" "errors" + "github.com/yugabyte/pgx/v5" "io" "time" @@ -33,6 +34,8 @@ var ( ErrNotFound = errors.New("not found") ) +const NoSkipCacheTTL = 3 * time.Minute + type ipniEntry struct { Data []byte Prev cid.Cid @@ -45,6 +48,9 @@ type ServeChunker struct { cpr *cachedreader.CachedPieceReader entryCache *lru.Cache[cid.Cid, *promise.Promise[result.Result[ipniEntry]]] + + // small cache keeping track of which piece CIDs shouldn't be skipped. Entries expire after NoSkipCacheTTL + noSkipCache *lru.Cache[cid.Cid, time.Time] } // Entries are 0.5MiB in size, so we do ~10MiB of caching here @@ -52,14 +58,14 @@ type ServeChunker struct { const EntryCacheSize = 20 func NewServeChunker(db *harmonydb.DB, pieceProvider *pieceprovider.PieceProvider, indexStore *indexstore.IndexStore, cpr *cachedreader.CachedPieceReader) *ServeChunker { - entryCache := must.One(lru.New[cid.Cid, *promise.Promise[result.Result[ipniEntry]]](EntryCacheSize)) - return &ServeChunker{ db: db, pieceProvider: pieceProvider, indexStore: indexStore, cpr: cpr, - entryCache: entryCache, + + entryCache: must.One(lru.New[cid.Cid, *promise.Promise[result.Result[ipniEntry]]](EntryCacheSize)), + noSkipCache: must.One(lru.New[cid.Cid, time.Time](EntryCacheSize)), } } @@ -102,6 +108,9 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated if v.Error == nil { prevChunk = v.Value.Prev return v.Value.Data, nil + } else if v.Error == ErrNotFound { + log.Errorw("Cached promise skip", "block", block, "prev", prevChunk, "err", err) + return v.Value.Data, v.Error } log.Errorw("Error in cached promise", "block", block, "error", v.Error) } @@ -159,12 +168,24 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated } chunk := ipniChunks[0] - pieceCid, err := cid.Parse(chunk.PieceCID) if err != nil { return nil, xerrors.Errorf("parsing piece CID: %w", err) } + if leave, ok := p.noSkipCache.Get(pieceCid); !ok || time.Now().After(leave) { + skip, err := p.checkIsEntrySkip(ctx, block) + if err != nil { + return nil, xerrors.Errorf("checking entry skipped for block %s: %w", block, err) + } + if skip { + log.Warnw("Skipped entry skipped for block", "block", block) + return nil, ErrNotFound + } + } + + p.noSkipCache.Add(pieceCid, time.Now().Add(NoSkipCacheTTL)) + var next ipld.Link if chunk.PrevCID != nil { prevChunk, err = cid.Parse(*chunk.PrevCID) @@ -298,3 +319,17 @@ func (p *ServeChunker) reconstructChunkFromDB(ctx context.Context, chunk, piece return b.Bytes(), nil } + +func (p *ServeChunker) checkIsEntrySkip(ctx context.Context, entry cid.Cid) (bool, error) { + // CREATE INDEX ipni_entries_skip ON ipni(entries, is_skip, piece_cid); + var isSkip bool + err := p.db.QueryRow(ctx, `SELECT is_skip FROM ipni WHERE entries = $1`, entry).Scan(&isSkip) + if err != nil { + if err == pgx.ErrNoRows { + return false, nil + } + return false, err + } + + return isSkip, nil +} diff --git a/market/ipni/ipni-provider/ipni-provider.go b/market/ipni/ipni-provider/ipni-provider.go index 158b2e0ec..d5aaf8edf 100644 --- a/market/ipni/ipni-provider/ipni-provider.go +++ b/market/ipni/ipni-provider/ipni-provider.go @@ -367,7 +367,7 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) { entry, err := p.sc.GetEntry(r.Context(), b) if err != nil { if errors.Is(err, chunker.ErrNotFound) { - log.Debugw("No Content Found", "CID", b.String()) + log.Warnw("No Content Found", "CID", b.String()) http.Error(w, "", http.StatusNotFound) return } diff --git a/web/api/webrpc/ipni.go b/web/api/webrpc/ipni.go index 93c583f72..b4503c5e3 100644 --- a/web/api/webrpc/ipni.go +++ b/web/api/webrpc/ipni.go @@ -22,6 +22,7 @@ type IpniAd struct { AdCid string `db:"ad_cid" json:"ad_cid"` ContextID []byte `db:"context_id" json:"context_id"` IsRM bool `db:"is_rm" json:"is_rm"` + IsSkip bool `db:"is_skip" json:"is_skip"` PreviousAd sql.NullString `db:"previous"` Previous string `json:"previous"` SpID int64 `db:"sp_id" json:"sp_id"` @@ -50,6 +51,7 @@ func (a *WebRPC) GetAd(ctx context.Context, ad string) (*IpniAd, error) { ip.ad_cid, ip.context_id, ip.is_rm, + ip.is_skip, ip.previous, ipp.sp_id, ip.addresses, @@ -68,6 +70,7 @@ func (a *WebRPC) GetAd(ctx context.Context, ad string) (*IpniAd, error) { ip.ad_cid, ip.context_id, ip.is_rm, + ip.is_skip, ip.previous, ipp.sp_id, ip.addresses, @@ -328,3 +331,16 @@ func (a *WebRPC) IPNIEntry(ctx context.Context, block cid.Cid) (*EntryInfo, erro return &entry, nil } + +func (a *WebRPC) IPNISetSkip(ctx context.Context, adCid cid.Cid, skip bool) error { + n, err := a.deps.DB.Exec(ctx, `UPDATE ipni SET is_skip = $1 WHERE ad_cid = $2`, skip, adCid.String()) + if err != nil { + return xerrors.Errorf("updating ipni set: %w", err) + } + + if n == 0 { + return xerrors.Errorf("ipni set is zero") + } + + return nil +} diff --git a/web/static/pages/ipni/ipni_search.mjs b/web/static/pages/ipni/ipni_search.mjs index a571e8c94..7f26412dd 100644 --- a/web/static/pages/ipni/ipni_search.mjs +++ b/web/static/pages/ipni/ipni_search.mjs @@ -55,6 +55,17 @@ class IpniSearch extends LitElement { } } + async handleSetSkip(ad, skipValue) { + try { + await RPCCall('IPNISetSkip', [{"/": ad}, skipValue]); + await this.handleSearch(); // Reload data after setting skip + this.errorMessage = ''; + } catch (error) { + console.error('Error setting skip:', error); + this.errorMessage = 'Failed to set skip value.'; + } + } + handleScanClick() { this.showEntryGrid = true; } @@ -73,7 +84,7 @@ class IpniSearch extends LitElement {