Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

[WIP] Shed subscription #1014

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 236 additions & 0 deletions swarm/shed/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
package shed

import (
"context"
"crypto/rand"
"encoding/binary"
"errors"
"sync"

"github.com/syndtr/goleveldb/leveldb"
)

Expand Down Expand Up @@ -78,6 +84,11 @@ type Index struct {
decodeKeyFunc func(key []byte) (e IndexItem, err error)
encodeValueFunc func(fields IndexItem) (value []byte, err error)
decodeValueFunc func(value []byte) (e IndexItem, err error)

// triggers are used for signaling
// subscriptions to continue iterations.
triggers map[uint64]chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a trigger should be associated with a subscription not an index.
E.g., stream pkg subscribe to bin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this code, sending and receiving part of the subscriptions are completely decoupled. Subscription only receives data and controls whether the receiving should stop or terminate. While only index is responsible for writing (putting keys and values) and signaling any subscriptions on such events. So the responsibility is completely divided. Index does not need to track every existing subscription to trigger them, but just needs to call one function, while any subscription can be created and stopped without the need to adjust any trigger invocation.

It is of course possible to have triggers on Subscription, but then, both sending and receiving parts need to be changed when a subscription is created or stopped. Which may not be the problem if we do not create subscriptions dynamically or we want to manage them outside of this package.

For subscribing to a particular bin which is a prefix in front of the timestamp in the index, we would need subscriptions that work on a prefix and to see if we want to manage this subscriptions in general way in shed package or very specific in localstore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janos as long as it is possible to have bin specific triggers i am ok with this.

Copy link
Member

@zelig zelig Nov 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but looking at the code i dont think that is possible. Even if trigger is created in NewSubscription it should be assigned as a field of the subscription so that it can be triggered by the user. you could also just give the channel as an argument. Either way it could then just be a lazy event subscription triggered (written to with a default select) if there is a new item relevant for the iteration. Otherwise how would you do this?

All in all, I would eliminate the triggers map too. and provide either the channel as an argument or create it on the subscription and have a function Trigger(). In the latter case you would call Trigger on each bin's subscription when a new item is entered.

Alternatively, we could generalise it within shed if we had 'prefix' hooks on Put and Iterate. In this case the Index API would have SubscribeToPrefix which would subscribe to Put with that prefix. The advantage of this is that performancewise the subscription need to be created only if one iterator reaches the end

that said it would not be the same for every case. For instance garbage collection as well as push sync could be implemented with triggers. under this model, if the iterator function returns stop==true that would put the iterator in a state waiting for the trigger. If trigger was a chan IndexItem we can use trigger to reset the from.
With this we could manage
manage 1) pull sync intervals, 2) garbage collection (stop after GCcount items, then wait till capacity reached and always start from the beginning) and 3) push sync (stop after RetryInterval and start from the beginning)

triggersMu *sync.Mutex
}

// IndexFuncs structure defines functions for encoding and decoding
Expand Down Expand Up @@ -120,6 +131,8 @@ func (db *DB) NewIndex(name string, funcs IndexFuncs) (f Index, err error) {
},
encodeValueFunc: funcs.EncodeValue,
decodeValueFunc: funcs.DecodeValue,
triggers: make(map[uint64]chan struct{}),
triggersMu: new(sync.Mutex),
}, nil
}

Expand Down Expand Up @@ -262,3 +275,226 @@ func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) {
}
return it.Error()
}

// Subscription provides methods to control
// and get information about subscription state.
type Subscription struct {
stopChan chan struct{}
zelig marked this conversation as resolved.
Show resolved Hide resolved
onceOnce sync.Once
doneChan chan struct{}
err error
mu sync.RWMutex
}

// Err returns an error that subscription encountered.
// It should be usually called after the Done is read from.
// It is safe to call this function multiple times.
func (s *Subscription) Err() (err error) {
s.mu.RLock()
err = s.err
s.mu.RUnlock()
return err
}

// Done returns a read-only channel that will be closed
// when the subscription is stopped or encountered an error.
func (s *Subscription) Done() <-chan struct{} {
return s.doneChan
}

// Stop terminates the subscription without any error.
// It is safe to call this function multiple times.
func (s *Subscription) Stop() {
zelig marked this conversation as resolved.
Show resolved Hide resolved
s.onceOnce.Do(func() {
close(s.stopChan)
})
}

// NewSubscription starts a new subscription on the index.
// Subscribing is similar to iterating over the index key, but it
// is performend in the background and contiguously over the existing keys
// as well over the new keys when they are put. It is required to signal
// all the iterators to check new keys with TriggerSubscriptions method.
// This provides a greater control over subscription iterators instead to
// trigger subscriptions on Put method or batch writes.
// IndexIterFunc behaves the same as in iterate methods.
// Provided context allows cancellation of created goroutine and Subscription.Err()
// will return appropriate error from context.
func (f Index) NewSubscription(ctx context.Context, fn IndexIterFunc) (s *Subscription, err error) {
return f.newSubscription(ctx, f.prefix, fn)
}

// NewSubscriptionFrom is the same method as NewSubscription, but it
// iterates over the keys from the provided start index.
func (f Index) NewSubscriptionFrom(ctx context.Context, start IndexItem, fn IndexIterFunc) (s *Subscription, err error) {
startKey, err := f.encodeKeyFunc(start)
if err != nil {
return nil, err
}
return f.newSubscription(ctx, startKey, fn)
}

// newSubscription provides base functionality for NewSubscription
// and NewSubscriptionFrom methods.
// It creates a new goroutine which will iterate over existing keys of the index
// and create new iterators when TriggerSubscriptions is called.
func (f Index) newSubscription(ctx context.Context, startKey []byte, fn IndexIterFunc) (s *Subscription, err error) {
// Create a subscription id to be able to remove the channel from the triggers map.
f.triggersMu.Lock()
// Generate new ID.
id, err := f.newSubscriptionID()
if err != nil {
f.triggersMu.Unlock()
return nil, err
}
// trigger is a one size buffered channel for two reasons
// - to be able to signal the first iteration
// - performance on sending to channel in TriggerSubscriptions
trigger := make(chan struct{}, 1)
f.triggers[id] = trigger
f.triggersMu.Unlock()

// send signal for the initial iteration
trigger <- struct{}{}

s = &Subscription{
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
}

go func() {
// this error will be checked in defer
// and set to Subscription.err
var err error
defer func() {
if err != nil {
s.mu.Lock()
s.err = err
s.mu.Unlock()
}
// signal that the subscription is done
close(s.doneChan)
f.triggersMu.Lock()
// clean up the trigger channel
delete(f.triggers, id)
f.triggersMu.Unlock()
}()
// This flag identifies the first iteration to
// include the start item in it, and to exclude the
// start item in the next ones, as they are already sent
// in previous iterations.
firstIteration := true
for {
select {
case <-trigger:
// This closure is to provide a clean defer for
// iteration release.
err = func() error {
it := f.db.NewIterator()
defer it.Release()

ok := it.Seek(startKey)
if firstIteration {
// The firs iteration will set the flag to false
// to provide information to all next iterations.
firstIteration = false
} else {
// All iterations but first will start from the
// startKey+1 as the start key for all non-first
// iterations is already processed.
ok = it.Next()
}

for ; ok; ok = it.Next() {
key := it.Key()
if key[0] != f.prefix[0] {
break
}
keyIndexItem, err := f.decodeKeyFunc(key)
if err != nil {
return err
}
valueIndexItem, err := f.decodeValueFunc(it.Value())
if err != nil {
return err
}
stop, err := fn(keyIndexItem.Merge(valueIndexItem))
if err != nil {
return err
}
startKey = key
if stop {
// Q: should the whole subscription stop or just this iteration?
s.Stop()
break
}
select {
case <-s.stopChan:
return nil
case <-ctx.Done():
return ctx.Err()
default:
}
}
return it.Error()
}()
if err != nil {
return
}
case <-s.stopChan:
return
case <-ctx.Done():
if err == nil {
err = ctx.Err()
}
return
}
}
}()

return s, nil
}

// newSubscriptionID generates a new subscription id as a random number.
func (f Index) newSubscriptionID() (id uint64, err error) {
b := make([]byte, 8)
newID := func() (uint64, error) {
_, err = rand.Read(b)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(b), nil
}
id, err = newID()
if err != nil {
return 0, err
}
// check up to 100 times if this id already exists
for i := 0; i < 100; i++ {
if _, ok := f.triggers[id]; !ok {
// this id is unique, return it
return id, nil
}
id, err = newID()
if err != nil {
return 0, err
}
}
return 0, errors.New("unable to generate subscription id")
}

// TriggerSubscriptions signals to all index subscriptions
// that they should continue iterating over the index keys
// where they stopped in the last iteration. This method
// should be called when new data is put to the index.
// It is not automatically called by the index Put method
// to allow for optimizations and for signaling of batch
// writes.
func (f Index) TriggerSubscriptions() {
for _, t := range f.triggers {
select {
case t <- struct{}{}:
default:
}
}
}
Loading