From fca85a2bdd85dfadbf72aa2f56fd97e68117e6e9 Mon Sep 17 00:00:00 2001 From: Michael Avila Date: Mon, 25 Mar 2019 09:03:09 -0700 Subject: [PATCH] Query for provider head/tail License: MIT Signed-off-by: Michael Avila --- go.mod | 4 +-- go.sum | 10 +++---- provider/queue.go | 66 +++++++++++++++++++++++++----------------- provider/queue_test.go | 26 ++++++++++++++++- 4 files changed, 70 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index a4a9f9c55f2..85cc579d8b4 100644 --- a/go.mod +++ b/go.mod @@ -20,10 +20,10 @@ require ( github.com/ipfs/go-blockservice v0.0.3 github.com/ipfs/go-cid v0.0.1 github.com/ipfs/go-cidutil v0.0.1 - github.com/ipfs/go-datastore v0.0.1 + github.com/ipfs/go-datastore v0.0.2 github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-ds-badger v0.0.2 - github.com/ipfs/go-ds-flatfs v0.0.1 + github.com/ipfs/go-ds-flatfs v0.0.2 github.com/ipfs/go-ds-leveldb v0.0.1 github.com/ipfs/go-ds-measure v0.0.1 github.com/ipfs/go-fs-lock v0.0.1 diff --git a/go.sum b/go.sum index 0fb3221bc3c..b7a50cef8ba 100644 --- a/go.sum +++ b/go.sum @@ -121,12 +121,14 @@ github.com/ipfs/go-cidutil v0.0.1 h1:UpDQI2LrihqOGY2mHaMhjrhh1DJ14N/58BQb7lKXvlQ github.com/ipfs/go-cidutil v0.0.1/go.mod h1:/0H649ymJksNEZvBAkM18HIctk7tkONH9tspTeLok48= github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms= github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= +github.com/ipfs/go-datastore v0.0.2 h1:Blyjq95atbxmCHSaDt42phIhf9NGLflzwq/95FqsNs0= +github.com/ipfs/go-datastore v0.0.2/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2 h1:7ToQt7QByBhOTuZF2USMv+PGlMcBC7FW7FdgQ4FCsoo= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= -github.com/ipfs/go-ds-flatfs v0.0.1 h1:yqWwRYFOGNClUL7V2jvcx4KMMso1Jv+pgQzsv9/gWBs= -github.com/ipfs/go-ds-flatfs v0.0.1/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM= +github.com/ipfs/go-ds-flatfs v0.0.2 h1:1zujtU5bPBH6B8roE+TknKIbBCrpau865xUk0dH3x2A= +github.com/ipfs/go-ds-flatfs v0.0.2/go.mod h1:YsMGWjUieue+smePAWeH/YhHtlmEMnEGhiwIn6K6rEM= github.com/ipfs/go-ds-leveldb v0.0.1 h1:Z0lsTFciec9qYsyngAw1f/czhRU35qBLR2vhavPFgqA= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-measure v0.0.1 h1:PrCueug+yZLkDCOthZTXKinuoCal/GvlAT7cNxzr03g= @@ -143,8 +145,6 @@ github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE github.com/ipfs/go-ipfs-chunker v0.0.1/go.mod h1:tWewYK0we3+rMbOh7pPFGDyypCtvGcBFymgY4rSDLAw= github.com/ipfs/go-ipfs-cmdkit v0.0.1 h1:X6YXEAjUljTzevE6DPUKXSqcgf+4FXzcn5B957F5MXo= github.com/ipfs/go-ipfs-cmdkit v0.0.1/go.mod h1:9FtbMdUabcSqv/G4/8WCxSLxkZxn/aZEFrxxqnVcRbg= -github.com/ipfs/go-ipfs-cmds v0.0.3 h1:QvNUE8lslNQghxXf6vzV1ZoMQCDDAtKG8f2oINiRew4= -github.com/ipfs/go-ipfs-cmds v0.0.3/go.mod h1:1QVgxSgenZvOMGVC/XUTC7tJxRBGPLxYvpgPpCi3DUk= github.com/ipfs/go-ipfs-cmds v0.0.4 h1:Iq4I8irWw5TmHe/4pjSyYJLbYkkdMOgHVe8ofJmPa4k= github.com/ipfs/go-ipfs-cmds v0.0.4/go.mod h1:1QVgxSgenZvOMGVC/XUTC7tJxRBGPLxYvpgPpCi3DUk= github.com/ipfs/go-ipfs-config v0.0.1 h1:6ED08emzI1imdsAjixFi2pEyZxTVD5ECKtCOxLBx+Uc= @@ -220,8 +220,6 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec h1:DQqZhhDvrTrEQ3Q github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= -github.com/jbenet/go-is-domain v0.0.0-20160119110217-ba9815c809e0 h1:qxMncUW0TzViA3REiN3/YgVOoekVtUtEY0O/j/Qlctg= -github.com/jbenet/go-is-domain v0.0.0-20160119110217-ba9815c809e0/go.mod h1:I9DYFcJAixF5f9iOu/9oC451/bq+QDTaLGznkcJPWgg= github.com/jbenet/go-is-domain v1.0.2 h1:11r5MSptcNFZyBoqubBQnVMUKRWLuRjL1banaIk+iYo= github.com/jbenet/go-is-domain v1.0.2/go.mod h1:xbRLRb0S7FgzDBTJlguhDVwLYM/5yNtvktxj2Ttfy7Q= github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c h1:uUx61FiAa1GI6ZmVd2wf2vULeQZIKG66eybjNXKYCz4= diff --git a/provider/queue.go b/provider/queue.go index a3268e10933..918bdcbd769 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -2,7 +2,7 @@ package provider import ( "context" - "math" + "fmt" "strconv" "strings" @@ -32,7 +32,7 @@ type Queue struct { // NewQueue creates a queue for cids func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) - head, tail, err := getQueueHeadTail(ctx, name, namespaced) + head, tail, err := getQueueHeadTail(ctx, namespaced) if err != nil { return nil, err } @@ -142,40 +142,52 @@ func (q *Queue) work() { } func (q *Queue) queueKey(id uint64) datastore.Key { - return datastore.NewKey(strconv.FormatUint(id, 10)) + s := fmt.Sprintf("%016X", id) + return datastore.NewKey(s) } -// crawl over the queue entries to find the head and tail -func getQueueHeadTail(ctx context.Context, name string, datastore datastore.Datastore) (uint64, uint64, error) { - q := query.Query{} - results, err := datastore.Query(q) +func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) { + head, err := getQueueHead(datastore) if err != nil { return 0, 0, err } + tail, err := getQueueTail(datastore) + if err != nil { + return 0, 0, err + } + return head, tail, nil +} - var tail uint64 - var head uint64 = math.MaxUint64 - for entry := range results.Next() { - trimmed := strings.TrimPrefix(entry.Key, "/") - id, err := strconv.ParseUint(trimmed, 10, 64) - if err != nil { - return 0, 0, err - } +func getQueueHead(ds datastore.Datastore) (uint64, error) { + return getFirstIDByOrder(ds, query.OrderByKey{}) +} - if id < head { - head = id - } +func getQueueTail(ds datastore.Datastore) (uint64, error) { + tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{}) + if err != nil { + return 0, err + } + if tail > 0 { + tail++ + } + return tail, nil +} - if (id + 1) > tail { - tail = (id + 1) - } +func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) { + q := query.Query{Orders: []query.Order{order}} + results, err := ds.Query(q) + if err != nil { + return 0, err } - if err := results.Close(); err != nil { - return 0, 0, err + defer results.Close() + r, ok := results.NextSync() + if !ok { + return 0, nil } - if head == math.MaxUint64 { - head = 0 + trimmed := strings.TrimPrefix(r.Key, "/") + id, err := strconv.ParseUint(trimmed, 16, 64) + if err != nil { + return 0, err } - - return head, tail, nil + return id, nil } diff --git a/provider/queue_test.go b/provider/queue_test.go index e1b74878ea5..2857da0a93e 100644 --- a/provider/queue_test.go +++ b/provider/queue_test.go @@ -12,7 +12,7 @@ import ( func makeCids(n int) []cid.Cid { cids := make([]cid.Cid, 0, n) - for i := 0; i < 10; i++ { + for i := 0; i < n; i++ { c := blockGenerator.Next().Cid() cids = append(cids, c) } @@ -129,3 +129,27 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[5:], queue, t) } + +func TestInitializationWithManyCids(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + queue, err := NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + cids := makeCids(25) + for _, c := range cids { + queue.Enqueue(c) + } + + // make a new queue, same data + queue, err = NewQueue(ctx, "test", ds) + if err != nil { + t.Fatal(err) + } + + assertOrdered(cids, queue, t) +}