diff --git a/harmony/harmonydb/sql/20240731-market-migration.sql b/harmony/harmonydb/sql/20240731-market-migration.sql
index aa3516545..1b766e24a 100644
--- a/harmony/harmonydb/sql/20240731-market-migration.sql
+++ b/harmony/harmonydb/sql/20240731-market-migration.sql
@@ -182,6 +182,8 @@ CREATE TABLE market_mk12_deal_pipeline (
complete BOOLEAN NOT NULL DEFAULT FALSE,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()),
+
constraint market_mk12_deal_pipeline_identity_key unique (uuid)
);
@@ -201,7 +203,7 @@ BEGIN
FROM
%I ssp
JOIN
- market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_num = dp.sector
+ market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_number = dp.sector
WHERE
ssp.task_id_move_storage = $1', sealing_table);
ELSIF sealing_table = 'sectors_snap_pipeline' THEN
@@ -236,8 +238,6 @@ FOR pms IN EXECUTE query USING task_id
EXCEPTION
WHEN OTHERS THEN
- -- Rollback the transaction and raise the exception for Go to catch
- ROLLBACK;
RAISE EXCEPTION 'Failed to create indexing task: %', SQLERRM;
END;
$$ LANGUAGE plpgsql;
diff --git a/market/libp2p/libp2p.go b/market/libp2p/libp2p.go
index 565ad9682..57bf807fe 100644
--- a/market/libp2p/libp2p.go
+++ b/market/libp2p/libp2p.go
@@ -266,13 +266,15 @@ const providerReadDeadline = 10 * time.Second
const providerWriteDeadline = 10 * time.Second
func SafeHandle(h network.StreamHandler) network.StreamHandler {
- defer func() {
- if r := recover(); r != nil {
- netlog.Error("panic occurred", "stack", debug.Stack())
- }
- }()
+ return func(stream network.Stream) {
+ defer func() {
+ if r := recover(); r != nil {
+ netlog.Error("panic occurred\n", string(debug.Stack()))
+ }
+ }()
- return h
+ h(stream)
+ }
}
// DealProvider listens for incoming deal proposals over libp2p
@@ -456,7 +458,7 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) {
reqLog.Infow("received deal proposal")
startExec := time.Now()
- var res *mk12.ProviderDealRejectionInfo
+ var res mk12.ProviderDealRejectionInfo
if lo.Contains(p.disabledMiners, proposal.ClientDealProposal.Proposal.Provider) {
reqLog.Infow("Deal rejected as libp2p is disabled for provider", "deal", proposal.DealUUID, "provider", proposal.ClientDealProposal.Proposal.Provider)
@@ -466,11 +468,13 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) {
// Start executing the deal.
// Note: This method just waits for the deal to be accepted, it doesn't
// wait for deal execution to complete.
- res, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer())
+ eres, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer())
reqLog.Debugw("processed deal proposal accept")
if err != nil {
reqLog.Warnw("deal proposal failed", "err", err, "reason", res.Reason)
}
+
+ res = *eres
}
// Log the response
diff --git a/market/mk12/mk12.go b/market/mk12/mk12.go
index b21b56edd..d383679e4 100644
--- a/market/mk12/mk12.go
+++ b/market/mk12/mk12.go
@@ -427,7 +427,7 @@ func (m *MK12) processDeal(ctx context.Context, deal *ProviderDealState) (*Provi
_, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, piece_cid, piece_size, offline, url, raw_size, should_index, announce)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (uuid) DO NOTHING`,
- deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline, pieceIDUrl, deal.Transfer.Size,
+ deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.IsOffline, pieceIDUrl.String(), deal.Transfer.Size,
deal.FastRetrieval, deal.AnnounceToIPNI)
if err != nil {
return false, xerrors.Errorf("inserting deal into deal pipeline: %w", err)
diff --git a/market/storageingest/deal_ingest_snap.go b/market/storageingest/deal_ingest_snap.go
index 09d91a3ef..254a63490 100644
--- a/market/storageingest/deal_ingest_snap.go
+++ b/market/storageingest/deal_ingest_snap.go
@@ -33,6 +33,7 @@ import (
)
const IdealEndEpochBuffer = 2 * builtin.EpochsInDay
+const MaxEndEpochBufferUnverified = 180 * builtin.EpochsInDay
// assuming that snap takes up to 20min to get to submitting the message we want to avoid sectors from deadlines which will
// become immutable in the next 20min (40 epochs)
@@ -297,6 +298,8 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, tx *harmo
vd.tmax = alloc.TermMax
maxExpiration = int64(head.Height() + alloc.TermMax)
+ } else {
+ maxExpiration = int64(piece.DealSchedule.EndEpoch) + MaxEndEpochBufferUnverified
}
propJson, err = json.Marshal(piece.PieceActivationManifest)
if err != nil {
diff --git a/tasks/storage-market/storage_market.go b/tasks/storage-market/storage_market.go
index dcbb89bd6..ca381ff3f 100644
--- a/tasks/storage-market/storage_market.go
+++ b/tasks/storage-market/storage_market.go
@@ -64,25 +64,33 @@ type CurioStorageDealMarket struct {
}
type MK12Pipeline struct {
- UUID string `db:"uuid"`
- SpID int64 `db:"sp_id"`
- Started bool `db:"started"`
- PieceCid string `db:"piece_cid"`
- Offline bool `db:"offline"`
- Downloaded bool `db:"downloaded"`
- RawSize int64 `db:"raw_size"`
- URL string `db:"url"`
- Headers json.RawMessage `db:"headers"`
-
- CommTaskID *int64 `db:"commp_task_id"`
- AfterCommp bool `db:"after_commp"`
- PSDWaitTime time.Time `db:"psd_wait_time"`
- PSDTaskID *int64 `db:"psd_task_id"`
- AfterPSD bool `db:"after_psd"`
- FindDealTaskID *int64 `db:"find_deal_task_id"`
- AfterFindDeal bool `db:"after_find_deal"`
- Sector *int64 `db:"sector"`
- Offset *int64 `db:"sector_offset"`
+ UUID string `db:"uuid"`
+ SpID int64 `db:"sp_id"`
+
+ // started after data download
+ Started bool `db:"started"`
+ PieceCid string `db:"piece_cid"`
+ Offline bool `db:"offline"` // data is not downloaded before starting the deal
+ RawSize int64 `db:"raw_size"`
+ URL *string `db:"url"`
+ Headers json.RawMessage `db:"headers"`
+
+ // commP task
+ CommTaskID *int64 `db:"commp_task_id"`
+ AfterCommp bool `db:"after_commp"`
+
+ // PSD task
+ PSDWaitTime *time.Time `db:"psd_wait_time"` // set in commp to now
+ PSDTaskID *int64 `db:"psd_task_id"`
+ AfterPSD bool `db:"after_psd"`
+
+ // Find Deal task (just looks at the chain for the deal ID)
+ FindDealTaskID *int64 `db:"find_deal_task_id"`
+ AfterFindDeal bool `db:"after_find_deal"`
+
+ // Sector the deal was assigned into
+ Sector *int64 `db:"sector"`
+ Offset *int64 `db:"sector_offset"`
}
func NewCurioStorageDealMarket(miners []address.Address, db *harmonydb.DB, cfg *config.CurioConfig, sc *ffi.SealCalls, mapi storageMarketAPI) *CurioStorageDealMarket {
@@ -208,12 +216,14 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) {
p.after_psd as after_psd,
p.find_deal_task_id as find_deal_task_id,
p.after_find_deal as after_find_deal,
- p.psd_wait_time as psd_wait_time
+ p.psd_wait_time as psd_wait_time,
+
+ p.sector as sector,
+ p.sector_offset as sector_offset
FROM
market_mk12_deal_pipeline p
LEFT JOIN
market_mk12_deals b ON p.uuid = b.uuid
- WHERE p.started = TRUE
ORDER BY b.start_epoch ASC;`)
if err != nil {
@@ -232,7 +242,7 @@ func (d *CurioStorageDealMarket) processMK12Deals(ctx context.Context) {
deal := deal
err := d.processMk12Deal(ctx, deal)
if err != nil {
- log.Errorf("%w", err)
+ log.Errorf("process deal: %s", err)
}
}
}
@@ -242,8 +252,8 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P
// Try to mark the deal as started
if !deal.Started {
// Check if download is finished and update the deal state in DB
- if deal.URL != "" {
- goUrl, err := url.Parse(deal.URL)
+ if deal.URL != nil && *deal.URL != "" {
+ goUrl, err := url.Parse(*deal.URL)
if err != nil {
return xerrors.Errorf("UUID: %s parsing data URL: %w", deal.UUID, err)
}
@@ -271,7 +281,6 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P
return xerrors.Errorf("failed to mark deal %s as started: %w", deal.UUID, err)
}
log.Infof("UUID: %s deal started successfully", deal.UUID)
- return nil
}
}
} else {
@@ -317,6 +326,17 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P
// Create Find Deal task
if deal.Started && deal.AfterCommp && deal.AfterPSD && !deal.AfterFindDeal && deal.FindDealTaskID == nil {
+ var executed bool
+ err := d.db.QueryRow(ctx, `SELECT EXISTS(SELECT TRUE FROM market_mk12_deals d
+ INNER JOIN message_waits mw ON mw.signed_message_cid = d.publish_cid
+ WHERE mw.executed_tsk_cid IS NOT NULL AND d.uuid = $1)`, deal.UUID).Scan(&executed)
+ if err != nil {
+ return xerrors.Errorf("UUID: %s: checking if the message is executed: %w", deal.UUID, err)
+ }
+ if !executed {
+ return nil
+ }
+
d.adders[pollerFindDeal].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET find_deal_task_id = $1
@@ -337,7 +357,7 @@ func (d *CurioStorageDealMarket) processMk12Deal(ctx context.Context, deal MK12P
if deal.AfterFindDeal && deal.Sector == nil && deal.Offset == nil {
err := d.ingestDeal(ctx, deal)
if err != nil {
- return err
+ return xerrors.Errorf("ingest deal: %w", err)
}
}
return nil
@@ -361,7 +381,7 @@ type MarketMK12Deal struct {
PublishCid string `db:"publish_cid"`
FastRetrieval bool `db:"fast_retrieval"`
AnnounceToIpni bool `db:"announce_to_ipni"`
- Error string `db:"error"`
+ Error *string `db:"error"`
}
func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, deal string, pcid string) error {
@@ -461,7 +481,7 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pip
dm := make(map[int64]queue)
for _, deal := range deals {
- if deal.Started && deal.AfterCommp && !deal.AfterPSD && deal.PSDTaskID == nil {
+ if deal.Started && deal.AfterCommp && deal.PSDWaitTime != nil && !deal.AfterPSD && deal.PSDTaskID == nil {
// Check if the spID is already in the map
if q, exists := dm[deal.SpID]; exists {
// Append the UUID to the deals list
@@ -469,7 +489,7 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pip
// Update the time if the current deal's time is older
if deal.PSDWaitTime.Before(q.t) {
- q.t = deal.PSDWaitTime
+ q.t = *deal.PSDWaitTime
}
// Update the map with the new queue
@@ -478,7 +498,7 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pip
// Add a new entry to the map if spID is not present
dm[deal.SpID] = queue{
deals: []string{deal.UUID},
- t: deal.PSDWaitTime,
+ t: *deal.PSDWaitTime,
}
}
}
@@ -488,19 +508,29 @@ func (d *CurioStorageDealMarket) addPSDTask(ctx context.Context, deals []MK12Pip
maxDeals := d.cfg.Market.StorageMarketConfig.MK12.MaxDealsPerPublishMsg
for _, q := range dm {
- if q.t.Add(time.Duration(publishPeriod)).After(time.Now()) || uint64(len(q.deals)) > maxDeals {
+ if q.t.Add(time.Duration(publishPeriod)).Before(time.Now()) || uint64(len(q.deals)) > maxDeals {
d.adders[pollerPSD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
- n, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1
- WHERE uuid = ANY($2) AND started = TRUE AND after_commp = TRUE
- AND psd_task_id IS NULL`, id, q.deals)
- if err != nil {
- return false, xerrors.Errorf("updating deal pipeline: %w", err)
+ var n int
+ for _, deal := range q.deals {
+ u, err := tx.Exec(`UPDATE market_mk12_deal_pipeline SET psd_task_id = $1
+ WHERE uuid = $2 AND started = TRUE AND after_commp = TRUE
+ AND psd_task_id IS NULL`, id, deal)
+ if err != nil {
+ return false, xerrors.Errorf("updating deal pipeline: %w", err)
+ }
+ n += u
+ }
+
+ if n > 0 {
+ log.Infof("PSD task created for %d deals %s", n, q.deals)
}
+
return n > 0, nil
})
+ } else {
+ log.Infow("PSD task not created as the time is not yet reached", "time", q.t.Add(time.Duration(publishPeriod)), "deals", q.deals)
}
- log.Infof("PSD task created successfully for deals %s", q.deals)
}
return nil
}
@@ -530,13 +560,11 @@ func (d *CurioStorageDealMarket) ingestDeal(ctx context.Context, deal MK12Pipeli
publish_cid,
fast_retrieval,
announce_to_ipni,
- url,
- url_headers,
error
FROM market_mk12_deals
WHERE uuid = $1;`, deal.UUID)
if err != nil {
- return false, xerrors.Errorf("failed to get MK12 deals from DB")
+ return false, xerrors.Errorf("failed to get MK12 deals from DB: %w", err)
}
if len(dbdeals) != 1 {
@@ -573,7 +601,7 @@ func (d *CurioStorageDealMarket) ingestDeal(ctx context.Context, deal MK12Pipeli
KeepUnsealed: true,
}
- dealUrl, err := url.Parse(deal.URL)
+ dealUrl, err := url.Parse(*deal.URL)
if err != nil {
return false, xerrors.Errorf("UUID: %s: %w", deal.UUID, err)
}
diff --git a/tasks/storage-market/task_commp.go b/tasks/storage-market/task_commp.go
index a4eea3151..c8297477f 100644
--- a/tasks/storage-market/task_commp.go
+++ b/tasks/storage-market/task_commp.go
@@ -55,10 +55,10 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
UUID string `db:"uuid"`
URL *string `db:"url"`
Headers json.RawMessage `db:"headers"`
- Size *int64 `db:"raw_size"`
+ RawSize int64 `db:"raw_size"`
}
- err = c.db.Select(ctx, &pieces, `SELECT uuid, url, headers, raw_size, piece_cid
+ err = c.db.Select(ctx, &pieces, `SELECT uuid, url, headers, raw_size, piece_cid, piece_size
FROM market_mk12_deal_pipeline WHERE commp_task_id = $1`, taskID)
if err != nil {
@@ -152,7 +152,7 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
reader = resp.Body
}
- pReader, _ := padreader.New(reader, uint64(*piece.Size))
+ pReader, pSz := padreader.New(reader, uint64(piece.RawSize))
defer func() {
_ = closer.Close()
@@ -164,8 +164,8 @@ func (c *CommpTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("copy into commp writer: %w", err)
}
- if written != *piece.Size {
- return false, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the file size %d", written, piece.Size)
+ if written != int64(pSz) {
+ return false, xerrors.Errorf("number of bytes written to CommP writer %d not equal to the file size %d", written, pSz)
}
calculatedCommp, err := w.Sum()
@@ -223,6 +223,12 @@ func (c *CommpTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Task
// ParkPiece should be scheduled on same node which has the piece
// Remote HTTP ones can be scheduled on any node
+ if true {
+ // TODO make this a setting
+ id := ids[0]
+ return &id, nil
+ }
+
ctx := context.Background()
var tasks []struct {
diff --git a/tasks/storage-market/task_find_deal.go b/tasks/storage-market/task_find_deal.go
index e71e601f6..bef0b03dc 100644
--- a/tasks/storage-market/task_find_deal.go
+++ b/tasks/storage-market/task_find_deal.go
@@ -120,7 +120,7 @@ func (f *FindDealTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
err = f.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid,
executed_rcpt_exitcode, executed_rcpt_gas_used
FROM message_waits
- WHERE signed_message_cid AND executed_tsk_epoch IS NOT NULL`, bd.PublishCid)
+ WHERE signed_message_cid = $1 AND executed_tsk_epoch IS NOT NULL`, bd.PublishCid)
if err != nil {
fdLog.Errorw("failed to query message_waits", "error", err)
}
diff --git a/tasks/storage-market/task_psd.go b/tasks/storage-market/task_psd.go
index 0ad56a177..639c8fd0b 100644
--- a/tasks/storage-market/task_psd.go
+++ b/tasks/storage-market/task_psd.go
@@ -78,7 +78,7 @@ func (p *PSDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
FROM
market_mk12_deal_pipeline p
JOIN
- market_12_deals b ON p.uuid = b.uuid
+ market_mk12_deals b ON p.uuid = b.uuid
WHERE
p.psd_task_id = $1;`, taskID)
@@ -102,7 +102,7 @@ func (p *PSDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
return false, xerrors.Errorf("unmarshal proposal: %w", err)
}
- var sig *crypto.Signature
+ var sig crypto.Signature
err = sig.UnmarshalBinary(d.Sig)
if err != nil {
return false, xerrors.Errorf("unmarshal signature: %w", err)
@@ -112,7 +112,7 @@ func (p *PSDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
uuid: d.UUID,
sprop: market.ClientDealProposal{
Proposal: prop,
- ClientSignature: *sig,
+ ClientSignature: sig,
},
})
}
diff --git a/web/api/webrpc/market.go b/web/api/webrpc/market.go
new file mode 100644
index 000000000..d627b77ef
--- /dev/null
+++ b/web/api/webrpc/market.go
@@ -0,0 +1,126 @@
+package webrpc
+
+import (
+ "context"
+ "fmt"
+ "time"
+)
+
+type StorageAsk struct {
+ SpID int64 `db:"sp_id"`
+ Price int64 `db:"price"`
+ VerifiedPrice int64 `db:"verified_price"`
+ MinSize int64 `db:"min_size"`
+ MaxSize int64 `db:"max_size"`
+ CreatedAt int64 `db:"created_at"`
+ Expiry int64 `db:"expiry"`
+ Sequence int64 `db:"sequence"`
+}
+
+func (a *WebRPC) GetStorageAsk(ctx context.Context, spID int64) (*StorageAsk, error) {
+ var asks []StorageAsk
+ err := a.deps.DB.Select(ctx, &asks, `
+ SELECT sp_id, price, verified_price, min_size, max_size, created_at, expiry, sequence
+ FROM market_mk12_storage_ask
+ WHERE sp_id = $1
+ `, spID)
+ if err != nil {
+ return nil, err
+ }
+ if len(asks) == 0 {
+ return nil, fmt.Errorf("no storage ask found for sp_id %d", spID)
+ }
+ return &asks[0], nil
+}
+
+func (a *WebRPC) SetStorageAsk(ctx context.Context, ask *StorageAsk) error {
+ err := a.deps.DB.QueryRow(ctx, `
+ INSERT INTO market_mk12_storage_ask (
+ sp_id, price, verified_price, min_size, max_size, created_at, expiry, sequence
+ ) VALUES (
+ $1, $2, $3, $4, $5, $6, $7, 0
+ )
+ ON CONFLICT (sp_id) DO UPDATE SET
+ price = EXCLUDED.price,
+ verified_price = EXCLUDED.verified_price,
+ min_size = EXCLUDED.min_size,
+ max_size = EXCLUDED.max_size,
+ created_at = EXCLUDED.created_at,
+ expiry = EXCLUDED.expiry,
+ sequence = market_mk12_storage_ask.sequence + 1
+ RETURNING sequence
+ `, ask.SpID, ask.Price, ask.VerifiedPrice, ask.MinSize, ask.MaxSize, ask.CreatedAt, ask.Expiry).Scan(&ask.Sequence)
+ if err != nil {
+ return fmt.Errorf("failed to insert/update storage ask: %w", err)
+ }
+
+ return nil
+}
+
+type MK12Pipeline struct {
+ UUID string `db:"uuid" json:"uuid"`
+ SpID int64 `db:"sp_id" json:"sp_id"`
+ Started bool `db:"started" json:"started"`
+ PieceCid string `db:"piece_cid" json:"piece_cid"`
+ PieceSize int64 `db:"piece_size" json:"piece_size"`
+ RawSize *int64 `db:"raw_size" json:"raw_size"`
+ Offline bool `db:"offline" json:"offline"`
+ URL *string `db:"url" json:"url"`
+ Headers []byte `db:"headers" json:"headers"`
+ CommTaskID *int64 `db:"commp_task_id" json:"commp_task_id"`
+ AfterCommp bool `db:"after_commp" json:"after_commp"`
+ PSDTaskID *int64 `db:"psd_task_id" json:"psd_task_id"`
+ AfterPSD bool `db:"after_psd" json:"after_psd"`
+ PSDWaitTime *time.Time `db:"psd_wait_time" json:"psd_wait_time"`
+ FindDealTaskID *int64 `db:"find_deal_task_id" json:"find_deal_task_id"`
+ AfterFindDeal bool `db:"after_find_deal" json:"after_find_deal"`
+ Sector *int64 `db:"sector" json:"sector"`
+ Offset *int64 `db:"sector_offset" json:"sector_offset"`
+ CreatedAt time.Time `db:"created_at" json:"created_at"`
+ Complete bool `db:"complete" json:"complete"`
+}
+
+func (a *WebRPC) GetDealPipelines(ctx context.Context, limit int, offset int) ([]MK12Pipeline, error) {
+ if limit <= 0 {
+ limit = 25
+ }
+ if limit > 100 {
+ limit = 100
+ }
+ if offset < 0 {
+ offset = 0
+ }
+
+ var pipelines []MK12Pipeline
+ err := a.deps.DB.Select(ctx, &pipelines, `
+ SELECT
+ uuid,
+ sp_id,
+ started,
+ piece_cid,
+ piece_size,
+ raw_size,
+ offline,
+ url,
+ headers,
+ commp_task_id,
+ after_commp,
+ psd_task_id,
+ after_psd,
+ psd_wait_time,
+ find_deal_task_id,
+ after_find_deal,
+ sector,
+ sector_offset,
+ created_at,
+ complete
+ FROM market_mk12_deal_pipeline
+ ORDER BY created_at DESC
+ LIMIT $1 OFFSET $2
+ `, limit, offset)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch deal pipelines: %w", err)
+ }
+
+ return pipelines, nil
+}
diff --git a/web/static/actor-summary.mjs b/web/static/actor-summary.mjs
index 4643f05ba..49cf61d04 100644
--- a/web/static/actor-summary.mjs
+++ b/web/static/actor-summary.mjs
@@ -203,16 +203,6 @@ class ActorSummary extends LitElement {
}, 30000);
}
- renderWins(win1, win7, win30) {
- return html`
-
- 1day: ${win1} |
- 7day: ${win7} |
- 30day: ${win30} |
-
- `;
- }
-
renderDeadlines(deadlines) {
return html`
@@ -242,7 +232,7 @@ class ActorSummary extends LitElement {
Balance |
Available |
Worker |
-
Wins |
+
Wins 1d/7d/30d |
Expirations |
@@ -258,7 +248,7 @@ class ActorSummary extends LitElement {
${entry.ActorBalance} |
${entry.ActorAvailable} |
${entry.WorkerBalance} |
-
${this.renderWins(entry.Win1, entry.Win7, entry.Win30)} |
+
${entry.Win1}/${entry.Win7}/${entry.Win30} |
|
`)}
diff --git a/web/static/pages/market/deal-pipelines.mjs b/web/static/pages/market/deal-pipelines.mjs
new file mode 100644
index 000000000..b4c326ed1
--- /dev/null
+++ b/web/static/pages/market/deal-pipelines.mjs
@@ -0,0 +1,152 @@
+// deal-pipelines.mjs
+
+import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/all/lit-all.min.js';
+import RPCCall from '/lib/jsonrpc.mjs';
+
+class DealPipelines extends LitElement {
+ static properties = {
+ deals: { type: Array },
+ limit: { type: Number },
+ offset: { type: Number },
+ totalCount: { type: Number },
+ };
+
+ constructor() {
+ super();
+ this.deals = [];
+ this.limit = 25;
+ this.offset = 0;
+ this.totalCount = 0;
+ this.loadData();
+ }
+
+ async loadData() {
+ try {
+ const params = [this.limit, this.offset];
+ const deals = await RPCCall('GetDealPipelines', params);
+ this.deals = deals;
+
+ // Optionally, get the total count of deals for pagination (if your API supports it)
+ // For this example, we'll assume we have a way to get the total count
+ // this.totalCount = await RPCCall('GetDealPipelinesCount', []);
+
+ // If total count is not available, we can infer whether there are more pages based on the number of deals fetched
+ this.requestUpdate();
+ } catch (error) {
+ console.error('Failed to load deal pipelines:', error);
+ }
+ }
+
+ nextPage() {
+ this.offset += this.limit;
+ this.loadData();
+ }
+
+ prevPage() {
+ if (this.offset >= this.limit) {
+ this.offset -= this.limit;
+ } else {
+ this.offset = 0;
+ }
+ this.loadData();
+ }
+
+ render() {
+ return html`
+
+
+
+
+
Deal Pipelines
+
+
+
+ UUID |
+ SP ID |
+ Piece CID |
+ Piece Size |
+ Created At |
+ Status |
+
+
+
+
+ ${this.deals.map(
+ (deal) => html`
+
+ ${deal.uuid} |
+ f0${deal.sp_id} |
+ ${deal.piece_cid} |
+ ${this.formatBytes(deal.piece_size)} |
+ ${new Date(deal.created_at).toLocaleString()} |
+ ${this.getDealStatus(deal)} |
+
+ `
+ )}
+
+
+
+
+ `;
+ }
+
+ formatBytes(bytes) {
+ const units = ['Bytes', 'KiB', 'MiB', 'GiB', 'TiB'];
+ let i = 0;
+ let size = bytes;
+ while (size >= 1024 && i < units.length - 1) {
+ size /= 1024;
+ i++;
+ }
+ if (i === 0) {
+ return `${size} ${units[i]}`;
+ } else {
+ return `${size.toFixed(2)} ${units[i]}`;
+ }
+ }
+
+ getDealStatus(deal) {
+ if (deal.complete) {
+ return 'Complete';
+ } else if (deal.sector) {
+ return 'Sealed';
+ } else if (deal.after_find_deal) {
+ return 'On Chain';
+ } else if (deal.after_psd) {
+ return 'Piece Added';
+ } else if (deal.after_commp) {
+ return 'CommP Calculated';
+ } else if (deal.started) {
+ return 'Started';
+ } else {
+ return 'Pending';
+ }
+ }
+
+ static styles = css`
+ .pagination-controls {
+ display: flex;
+ justify-content: space-between;
+ align-items: center;
+ margin-top: 1rem;
+ }
+ `;
+}
+
+customElements.define('deal-pipelines', DealPipelines);
\ No newline at end of file
diff --git a/web/static/pages/market/index.html b/web/static/pages/market/index.html
new file mode 100644
index 000000000..f8c887a30
--- /dev/null
+++ b/web/static/pages/market/index.html
@@ -0,0 +1,32 @@
+
+
+
+
Storage Marker
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/web/static/pages/market/market-asks.mjs b/web/static/pages/market/market-asks.mjs
new file mode 100644
index 000000000..d80385221
--- /dev/null
+++ b/web/static/pages/market/market-asks.mjs
@@ -0,0 +1,450 @@
+// market-asks.mjs
+
+import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/all/lit-all.min.js';
+import RPCCall from '/lib/jsonrpc.mjs';
+
+class MarketAsks extends LitElement {
+ static properties = {
+ actorList: { type: Array },
+ spAsks: { type: Map },
+ updatingSpID: { type: Number },
+ newAsk: { type: Object },
+ sizeOptions: { type: Array },
+ };
+
+ constructor() {
+ super();
+ this.actorList = [];
+ this.spAsks = new Map();
+ this.updatingSpID = null;
+ this.newAsk = {};
+ this.sizeOptions = this.generateSizeOptions();
+ this.loadData();
+ }
+
+ // Generate size options from 64 GiB down to 128 bytes
+ generateSizeOptions() {
+ const sizes = [];
+ let sizeInBytes = 64 * 1024 ** 3; // Start with 64 GiB
+ const minSizeInBytes = 128; // Minimum size is 128 bytes
+
+ while (sizeInBytes >= minSizeInBytes) {
+ sizes.push({
+ value: sizeInBytes,
+ label: this.formatBytes(sizeInBytes),
+ });
+ sizeInBytes = sizeInBytes / 2; // Halve the size each time
+ }
+ return sizes;
+ }
+
+ // Format bytes into human-readable format
+ formatBytes(bytes) {
+ const units = ['Bytes', 'KiB', 'MiB', 'GiB', 'TiB'];
+ let i = 0;
+ let size = bytes;
+ while (size >= 1024 && i < units.length - 1) {
+ size /= 1024;
+ i++;
+ }
+ if (i === 0) {
+ return `${size} ${units[i]} (${bytes} bytes)`;
+ } else {
+ return `${size.toFixed(2)} ${units[i]} (${bytes} bytes)`;
+ }
+ }
+
+ async loadData() {
+ // Fetch actor list
+ const addresses = await RPCCall('ActorList', []);
+ // Map addresses to spIDs
+ const spIDs = addresses.map((addr) => parseInt(addr.substring(2)));
+ // Fetch storage asks for each spID
+ this.spAsks.clear();
+ await Promise.all(
+ spIDs.map(async (spID) => {
+ try {
+ const ask = await RPCCall('GetStorageAsk', [spID]);
+ this.spAsks.set(spID, ask);
+ } catch (error) {
+ // No ask for this spID
+ }
+ })
+ );
+ this.actorList = spIDs;
+ this.requestUpdate();
+ }
+
+ updateAsk(spID) {
+ // Open the update form
+ this.updatingSpID = spID;
+ const existingAsk = this.spAsks.get(spID) || {};
+ // Initialize newAsk with existing values or defaults
+ this.newAsk = {
+ SpID: spID,
+ Price: existingAsk.Price || '',
+ VerifiedPrice: existingAsk.VerifiedPrice || '',
+ MinSize: existingAsk.MinSize || 4 * 1024 ** 3, // Default to 4 GiB
+ MaxSize: existingAsk.MaxSize || 32 * 1024 ** 3, // Default to 32 GiB
+ PriceFIL: '1',
+ VerifiedPriceFIL: '0',
+ CreatedAt: '',
+ Expiry: '',
+ };
+
+ // If existing ask, convert AttoFIL/GiB/Epoch to FIL/TiB/Month
+ if (existingAsk.Price) {
+ this.newAsk.PriceFIL = this.attoFilToFilPerTiBPerMonth(existingAsk.Price);
+ }
+ if (existingAsk.VerifiedPrice) {
+ this.newAsk.VerifiedPriceFIL = this.attoFilToFilPerTiBPerMonth(existingAsk.VerifiedPrice);
+ }
+
+ this.requestUpdate();
+ }
+
+ async saveAsk() {
+ // Convert FIL/TiB/Month to AttoFIL/GiB/Epoch
+ const priceAtto = this.filToAttoFilPerGiBPerEpoch(parseFloat(this.newAsk.PriceFIL));
+ const verifiedPriceAtto = this.filToAttoFilPerGiBPerEpoch(parseFloat(this.newAsk.VerifiedPriceFIL));
+
+ // Set CreatedAt and Expiry
+ const now = Math.floor(Date.now() / 1000); // Unix timestamp in seconds
+ this.newAsk.CreatedAt = now;
+ // Set expiry to 365 days from now
+ this.newAsk.Expiry = now + 365 * 24 * 60 * 60; // 365 days in seconds
+
+ // Prepare the ask object to send to the server
+ const askToSend = {
+ SpID: this.newAsk.SpID,
+ Price: priceAtto,
+ VerifiedPrice: verifiedPriceAtto,
+ MinSize: parseInt(this.newAsk.MinSize),
+ MaxSize: parseInt(this.newAsk.MaxSize),
+ CreatedAt: this.newAsk.CreatedAt,
+ Expiry: this.newAsk.Expiry,
+ };
+
+ try {
+ await RPCCall('SetStorageAsk', [askToSend]);
+ // Reload data
+ await this.loadData();
+ // Close the form
+ this.updatingSpID = null;
+ } catch (error) {
+ console.error('Failed to set storage ask:', error);
+ }
+ }
+
+ // Conversion constants
+ get EPOCHS_IN_MONTH() {
+ return 86400;
+ }
+ get GIB_IN_TIB() {
+ return 1024;
+ }
+ get ATTOFIL_PER_FIL() {
+ return 1e18;
+ }
+
+ // Convert attoFIL/GiB/Epoch to FIL/TiB/Month
+ attoFilToFilPerTiBPerMonth(attoFilPerGiBPerEpoch) {
+ const filPerTiBPerMonth = (attoFilPerGiBPerEpoch * this.GIB_IN_TIB * this.EPOCHS_IN_MONTH) / this.ATTOFIL_PER_FIL;
+ return filPerTiBPerMonth.toFixed(8); // Limit to 8 decimal places
+ }
+
+ // Convert FIL/TiB/Month to attoFIL/GiB/Epoch
+ filToAttoFilPerGiBPerEpoch(filPerTiBPerMonth) {
+ const attoFilPerGiBPerEpoch = (filPerTiBPerMonth * this.ATTOFIL_PER_FIL) / this.GIB_IN_TIB / this.EPOCHS_IN_MONTH;
+ return Math.round(attoFilPerGiBPerEpoch); // Round to nearest integer
+ }
+
+ render() {
+ return html`
+
+
+
+
+
Storage Asks
+
+
+
+ SP ID |
+ Price (FIL/TiB/Month) |
+ Price (attoFIL/GiB/Epoch) |
+ Verified Price (FIL/TiB/Month) |
+ Verified Price (attoFIL/GiB/Epoch) |
+ Min Size |
+ Max Size |
+ Sequence |
+ Actions |
+
+
+
+ ${this.actorList.map((spID) => {
+ const ask = this.spAsks.get(spID);
+ return html`
+
+ f0${spID} |
+ ${ask ? this.attoFilToFilPerTiBPerMonth(ask.Price) : '-'} |
+ ${ask ? ask.Price : '-'} |
+ ${ask ? this.attoFilToFilPerTiBPerMonth(ask.VerifiedPrice) : '-'} |
+ ${ask ? ask.VerifiedPrice : '-'} |
+ ${ask ? this.formatBytes(ask.MinSize) : '-'} |
+ ${ask ? this.formatBytes(ask.MaxSize) : '-'} |
+ ${ask ? ask.Sequence : '-'} |
+
+
+ |
+
+ `;
+ })}
+
+
+ ${this.updatingSpID !== null ? this.renderUpdateForm() : ''}
+
+ `;
+ }
+
+ renderUpdateForm() {
+ return html`
+
+
+ `;
+ }
+
+ handlePriceFILInput(e) {
+ const value = e.target.value;
+ this.newAsk.PriceFIL = value;
+ if (value) {
+ this.newAsk.Price = this.filToAttoFilPerGiBPerEpoch(parseFloat(value));
+ } else {
+ this.newAsk.Price = '';
+ }
+ this.requestUpdate(); // Ensure the component re-renders
+ }
+
+ handleVerifiedPriceFILInput(e) {
+ const value = e.target.value;
+ this.newAsk.VerifiedPriceFIL = value;
+ if (value) {
+ this.newAsk.VerifiedPrice = this.filToAttoFilPerGiBPerEpoch(parseFloat(value));
+ } else {
+ this.newAsk.VerifiedPrice = '';
+ }
+ this.requestUpdate(); // Ensure the component re-renders
+ }
+
+ handleSubmit(e) {
+ e.preventDefault();
+ this.saveAsk();
+ }
+
+ static styles = css`
+ .modal {
+ position: fixed;
+ top: 0;
+ left: 0;
+ z-index: 1050;
+ width: 100%;
+ height: 100%;
+ overflow: hidden;
+ outline: 0;
+ display: flex;
+ align-items: center;
+ justify-content: center;
+ backdrop-filter: blur(5px);
+ }
+
+ .modal-dialog {
+ max-width: 600px;
+ margin: 1.75rem auto;
+ }
+
+ .modal-content {
+ background-color: var(--color-form-field, #1d1d21);
+ border: 1px solid var(--color-form-default, #808080);
+ border-radius: 0.3rem;
+ box-shadow: 0 0.5rem 1rem rgba(0, 0, 0, 0.5);
+ color: var(--color-text-primary, #FFF);
+ }
+
+ .modal-header,
+ .modal-footer {
+ display: flex;
+ align-items: center;
+ justify-content: space-between;
+ padding: 1rem;
+ border-bottom: 1px solid var(--color-form-default, #808080);
+ }
+
+ .modal-header {
+ border-bottom: none;
+ }
+
+ .modal-body {
+ position: relative;
+ padding: 1rem;
+ }
+
+ .form-label {
+ margin-bottom: 0.5rem;
+ color: var(--color-text-primary, #FFF);
+ }
+
+ .form-control,
+ .form-select {
+ display: block;
+ width: 100%;
+ padding: 0.375rem 0.75rem;
+ font-size: 1rem;
+ line-height: 1.5;
+ color: var(--color-text-primary, #FFF);
+ background-color: var(--color-form-group-1, #484848);
+ background-clip: padding-box;
+ border: 1px solid var(--color-form-default, #808080);
+ border-radius: 0.25rem;
+ transition: border-color 0.15s ease-in-out, box-shadow 0.15s ease-in-out;
+ }
+
+ .form-control:focus,
+ .form-select:focus {
+ color: var(--color-text-primary, #FFF);
+ background-color: var(--color-form-group-1, #484848);
+ border-color: var(--color-primary-light, #8BEFE0);
+ outline: 0;
+ box-shadow: 0 0 0 0.2rem rgba(29, 200, 204, 0.25);
+ }
+
+ .form-text {
+ color: var(--color-text-primary, #FFF);
+ font-size: 0.875em;
+ }
+
+ .modal-backdrop {
+ position: fixed;
+ top: 0;
+ left: 0;
+ z-index: 1040;
+ width: 100vw;
+ height: 100vh;
+ background-color: var(--color-text-secondary, #171717);
+ opacity: 0.5;
+ }
+
+ /* Responsive adjustments */
+ @media (max-width: 576px) {
+ .modal-dialog {
+ max-width: 100%;
+ margin: 0;
+ height: 100%;
+ display: flex;
+ flex-direction: column;
+ justify-content: center;
+ }
+
+ .modal-content {
+ height: auto;
+ border-radius: 0;
+ }
+ }
+ `;
+}
+
+customElements.define('market-asks', MarketAsks);
\ No newline at end of file
diff --git a/web/static/pages/node_info/index.html b/web/static/pages/node_info/index.html
index 228a1a218..9992c7c85 100644
--- a/web/static/pages/node_info/index.html
+++ b/web/static/pages/node_info/index.html
@@ -7,6 +7,7 @@
+
Node Info
@@ -19,6 +20,7 @@ Node Info
+