Skip to content

Commit

Permalink
ipni: Ad skip option
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Dec 3, 2024
1 parent 3db7204 commit 5ed280a
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 8 deletions.
8 changes: 7 additions & 1 deletion harmony/harmonydb/sql/20240823-ipni.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ CREATE TABLE ipni (
-- metadata column in not required as Curio only supports one type of metadata(HTTP)
is_rm BOOLEAN NOT NULL,

-- skip added in 20241106-market-fixes.sql
-- is_skip BOOLEAN NOT NULL DEFAULT FALSE, -- set to true means return 404 for related entries

previous TEXT, -- previous ad will only be null for first ad in chain

provider TEXT NOT NULL, -- peerID from libp2p, this is main identifier on IPNI side
Expand All @@ -33,7 +36,10 @@ CREATE INDEX ipni_provider_order_number ON ipni(provider, order_number);
CREATE UNIQUE INDEX ipni_ad_cid ON ipni(ad_cid);

-- This index will speed up lookups based on the ad_cid, which is frequently used to identify specific ads
CREATE UNIQUE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm);
CREATE UNIQUE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm); -- dropped in 20241106-market-fixes.sql
-- 20241106-market-fixes.sql:
-- CREATE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm, is_skip) -- non-unique to allow multiple skips
-- CREATE INDEX ipni_entries_skip ON ipni(entries, is_skip, piece_cid);

-- Since the get_ad_chain function relies on both provider and ad_cid to find the order_number, this index will optimize that query:
CREATE INDEX ipni_provider_ad_cid ON ipni(provider, ad_cid);
Expand Down
12 changes: 11 additions & 1 deletion harmony/harmonydb/sql/20241106-market-fixes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,14 @@ alter table market_mk12_deals
add proposal_cid text not null;

CREATE INDEX market_mk12_deals_proposal_cid_index
ON market_mk12_deals (proposal_cid);
ON market_mk12_deals (proposal_cid);


-- Add the is_skip column to the ipni table
ALTER TABLE ipni
ADD COLUMN is_skip BOOLEAN NOT NULL DEFAULT FALSE; -- set to true means return 404 for related entries

DROP INDEX IF EXISTS ipni_context_id;
CREATE INDEX ipni_context_id ON ipni(context_id, ad_cid, is_rm, is_skip);

CREATE INDEX ipni_entries_skip ON ipni(entries, is_skip, piece_cid);
43 changes: 39 additions & 4 deletions market/ipni/chunker/serve-chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/hex"
"errors"
"github.com/yugabyte/pgx/v5"
"io"
"time"

Expand Down Expand Up @@ -33,6 +34,8 @@ var (
ErrNotFound = errors.New("not found")
)

const NoSkipCacheTTL = 3 * time.Minute

type ipniEntry struct {
Data []byte
Prev cid.Cid
Expand All @@ -45,21 +48,24 @@ type ServeChunker struct {
cpr *cachedreader.CachedPieceReader

entryCache *lru.Cache[cid.Cid, *promise.Promise[result.Result[ipniEntry]]]

// small cache keeping track of which piece CIDs shouldn't be skipped. Entries expire after NoSkipCacheTTL
noSkipCache *lru.Cache[cid.Cid, time.Time]
}

// Entries are 0.5MiB in size, so we do ~10MiB of caching here
// This cache is only useful in the edge case when entry reads are very slow and time out - this makes retried reads faster
const EntryCacheSize = 20

func NewServeChunker(db *harmonydb.DB, pieceProvider *pieceprovider.PieceProvider, indexStore *indexstore.IndexStore, cpr *cachedreader.CachedPieceReader) *ServeChunker {
entryCache := must.One(lru.New[cid.Cid, *promise.Promise[result.Result[ipniEntry]]](EntryCacheSize))

return &ServeChunker{
db: db,
pieceProvider: pieceProvider,
indexStore: indexStore,
cpr: cpr,
entryCache: entryCache,

entryCache: must.One(lru.New[cid.Cid, *promise.Promise[result.Result[ipniEntry]]](EntryCacheSize)),
noSkipCache: must.One(lru.New[cid.Cid, time.Time](EntryCacheSize)),
}
}

Expand Down Expand Up @@ -102,6 +108,9 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated
if v.Error == nil {
prevChunk = v.Value.Prev
return v.Value.Data, nil
} else if v.Error == ErrNotFound {
log.Errorw("Cached promise skip", "block", block, "prev", prevChunk, "err", err)
return v.Value.Data, v.Error
}
log.Errorw("Error in cached promise", "block", block, "error", v.Error)
}
Expand Down Expand Up @@ -159,12 +168,24 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated
}

chunk := ipniChunks[0]

pieceCid, err := cid.Parse(chunk.PieceCID)
if err != nil {
return nil, xerrors.Errorf("parsing piece CID: %w", err)
}

if leave, ok := p.noSkipCache.Get(pieceCid); !ok || time.Now().After(leave) {
skip, err := p.checkIsEntrySkip(ctx, block)
if err != nil {
return nil, xerrors.Errorf("checking entry skipped for block %s: %w", block, err)
}
if skip {
log.Warnw("Skipped entry skipped for block", "block", block)
return nil, ErrNotFound
}
}

p.noSkipCache.Add(pieceCid, time.Now().Add(NoSkipCacheTTL))

var next ipld.Link
if chunk.PrevCID != nil {
prevChunk, err = cid.Parse(*chunk.PrevCID)
Expand Down Expand Up @@ -298,3 +319,17 @@ func (p *ServeChunker) reconstructChunkFromDB(ctx context.Context, chunk, piece

return b.Bytes(), nil
}

func (p *ServeChunker) checkIsEntrySkip(ctx context.Context, entry cid.Cid) (bool, error) {
// CREATE INDEX ipni_entries_skip ON ipni(entries, is_skip, piece_cid);
var isSkip bool
err := p.db.QueryRow(ctx, `SELECT is_skip FROM ipni WHERE entries = $1`, entry).Scan(&isSkip)
if err != nil {
if err == pgx.ErrNoRows {
return false, nil
}
return false, err
}

return isSkip, nil
}
2 changes: 1 addition & 1 deletion market/ipni/ipni-provider/ipni-provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (p *Provider) handleGet(w http.ResponseWriter, r *http.Request) {
entry, err := p.sc.GetEntry(r.Context(), b)
if err != nil {
if errors.Is(err, chunker.ErrNotFound) {
log.Debugw("No Content Found", "CID", b.String())
log.Warnw("No Content Found", "CID", b.String())
http.Error(w, "", http.StatusNotFound)
return
}
Expand Down
16 changes: 16 additions & 0 deletions web/api/webrpc/ipni.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type IpniAd struct {
AdCid string `db:"ad_cid" json:"ad_cid"`
ContextID []byte `db:"context_id" json:"context_id"`
IsRM bool `db:"is_rm" json:"is_rm"`
IsSkip bool `db:"is_skip" json:"is_skip"`
PreviousAd sql.NullString `db:"previous"`
Previous string `json:"previous"`
SpID int64 `db:"sp_id" json:"sp_id"`
Expand Down Expand Up @@ -50,6 +51,7 @@ func (a *WebRPC) GetAd(ctx context.Context, ad string) (*IpniAd, error) {
ip.ad_cid,
ip.context_id,
ip.is_rm,
ip.is_skip,
ip.previous,
ipp.sp_id,
ip.addresses,
Expand All @@ -68,6 +70,7 @@ func (a *WebRPC) GetAd(ctx context.Context, ad string) (*IpniAd, error) {
ip.ad_cid,
ip.context_id,
ip.is_rm,
ip.is_skip,
ip.previous,
ipp.sp_id,
ip.addresses,
Expand Down Expand Up @@ -328,3 +331,16 @@ func (a *WebRPC) IPNIEntry(ctx context.Context, block cid.Cid) (*EntryInfo, erro

return &entry, nil
}

func (a *WebRPC) IPNISetSkip(ctx context.Context, adCid cid.Cid, skip bool) error {
n, err := a.deps.DB.Exec(ctx, `UPDATE ipni SET is_skip = $1 WHERE ad_cid = $2`, skip, adCid.String())
if err != nil {
return xerrors.Errorf("updating ipni set: %w", err)
}

if n == 0 {
return xerrors.Errorf("ipni set is zero")
}

return nil
}
43 changes: 42 additions & 1 deletion web/static/pages/ipni/ipni_search.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ class IpniSearch extends LitElement {
}
}

async handleSetSkip(ad, skipValue) {
try {
await RPCCall('IPNISetSkip', [{"/": ad}, skipValue]);
await this.handleSearch(); // Reload data after setting skip
this.errorMessage = '';
} catch (error) {
console.error('Error setting skip:', error);
this.errorMessage = 'Failed to set skip value.';
}
}

handleScanClick() {
this.showEntryGrid = true;
}
Expand All @@ -73,7 +84,7 @@ class IpniSearch extends LitElement {
<div class="search-container">
<input
type="text"
placeholder="Enter Ad CID"
placeholder="Enter Ad/Entry CID"
.value="${this.searchTerm}"
@input="${this.handleInput}"
/>
Expand Down Expand Up @@ -103,6 +114,32 @@ class IpniSearch extends LitElement {
<th>Is Remove</th>
<td>${this.adData.is_rm}</td>
</tr>
<tr>
<th>Should Skip</th>
<td>
<span>${this.adData.is_skip}</span>
<details style="display: inline-block">
<summary style="display: inline-block" class="btn btn-secondary btn-sm">Set</summary>
${this.adData.is_skip
? html`
<button
class="btn btn-secondary btn-sm"
@click="${() => this.handleSetSkip(this.adData.ad_cid, false)}"
>
Disable Skip
</button>
`
: html`
<button
class="btn btn-danger btn-sm"
@click="${() => this.handleSetSkip(this.adData.ad_cid, true)}"
>
Enable Skip
</button>
`}
</details>
</td>
</tr>
<tr>
<th>Previous</th>
<td><a href="/pages/ipni/?ad_cid=${this.adData.previous}">${this.adData.previous}</a></td>
Expand Down Expand Up @@ -168,6 +205,10 @@ class IpniSearch extends LitElement {
background-color: var(--color-form-default);
color: var(--color-text-primary);
}
.btn-danger {
background-color: var(--color-danger-main);
}
.btn:hover,
.btn:focus,
Expand Down

0 comments on commit 5ed280a

Please sign in to comment.