Skip to content

Commit

Permalink
Orderbook and bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed Jul 3, 2019
1 parent c9e7fc7 commit 2371386
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 216 deletions.
3 changes: 2 additions & 1 deletion exp/ingest/io/ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func (c *Change) AccountSignersChanged() bool {
}

// GetChanges returns a developer friendly representation of LedgerEntryChanges.
// TODO this should include TransactionMetaV1.txChanges too!
// Currently it results operations related LedgerEntryChanges only.
// TODO this should include TransactionMetaV1.txChanges and fee changes too!
func (t *LedgerTransaction) GetChanges() []Change {
changes := []Change{}

Expand Down
1 change: 1 addition & 0 deletions exp/ingest/live_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s *LiveSession) Resume(ledgerSequence uint32) error {
ledgerReader, err := ledgerAdapter.GetLedger(ledgerSequence)
if err != nil {
if err == io.ErrNotFound {
// TODO make the idle time smaller
time.Sleep(time.Second)
continue
}
Expand Down
34 changes: 26 additions & 8 deletions exp/orderbook/batch.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//lint:file-ignore U1000 this package is currently unused but it will be used in a future PR

package orderbook

import (
"sync"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand All @@ -23,10 +23,10 @@ const (
type BatchedUpdates interface {
// AddOffer will queue an operation to add the given offer to the order book
AddOffer(offer xdr.OfferEntry) BatchedUpdates
// AddOffer will queue an operation to remove the given offer from the order book
// RemoveOffer will queue an operation to remove the given offer from the order book
RemoveOffer(offerID xdr.Int64) BatchedUpdates
// Apply will attempt to apply all the updates in the batch to the order book
Apply() error
Apply()
}

type orderBookOperation struct {
Expand All @@ -36,13 +36,22 @@ type orderBookOperation struct {
}

type orderBookBatchedUpdates struct {
// mutex is protecting access to operations from multiple go routines
mutex sync.Mutex
operations []orderBookOperation
orderbook *OrderBookGraph
committed bool
}

// AddOffer will queue an operation to add the given offer to the order book
func (tx *orderBookBatchedUpdates) AddOffer(offer xdr.OfferEntry) BatchedUpdates {
tx.mutex.Lock()
defer tx.mutex.Unlock()

if tx.committed {
panic(errBatchAlreadyApplied)
}

tx.operations = append(tx.operations, orderBookOperation{
operationType: addOfferOperationType,
offerID: offer.OfferId,
Expand All @@ -54,6 +63,13 @@ func (tx *orderBookBatchedUpdates) AddOffer(offer xdr.OfferEntry) BatchedUpdates

// AddOffer will queue an operation to remove the given offer from the order book
func (tx *orderBookBatchedUpdates) RemoveOffer(offerID xdr.Int64) BatchedUpdates {
tx.mutex.Lock()
defer tx.mutex.Unlock()

if tx.committed {
panic(errBatchAlreadyApplied)
}

tx.operations = append(tx.operations, orderBookOperation{
operationType: removeOfferOperationType,
offerID: offerID,
Expand All @@ -63,11 +79,15 @@ func (tx *orderBookBatchedUpdates) RemoveOffer(offerID xdr.Int64) BatchedUpdates
}

// Apply will attempt to apply all the updates in the batch to the order book
func (tx *orderBookBatchedUpdates) Apply() error {
func (tx *orderBookBatchedUpdates) Apply() {
tx.mutex.Lock()
defer tx.mutex.Unlock()

tx.orderbook.lock.Lock()
defer tx.orderbook.lock.Unlock()

if tx.committed {
return errBatchAlreadyApplied
panic(errBatchAlreadyApplied)
}
tx.committed = true

Expand All @@ -85,6 +105,4 @@ func (tx *orderBookBatchedUpdates) Apply() error {
panic(errors.New("invalid operation type"))
}
}

return nil
}
2 changes: 0 additions & 2 deletions exp/orderbook/edges.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//lint:file-ignore U1000 this package is currently unused but it will be used in a future PR

package orderbook

import (
Expand Down
35 changes: 32 additions & 3 deletions exp/orderbook/graph.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//lint:file-ignore U1000 this package is currently unused but it will be used in a future PR
package orderbook

import (
Expand Down Expand Up @@ -35,15 +34,45 @@ type OrderBookGraph struct {
// tradingPairForOffer maps an offer id to the assets which are being exchanged
// in the given offer
tradingPairForOffer map[xdr.Int64]tradingPair
lock sync.RWMutex
// batchedUpdates is internal batch of updates to this graph. Users can
// create multiple batches using `Batch()` method but sometimes only one
// batch is enough.
batchedUpdates BatchedUpdates
lock sync.RWMutex
}

// NewOrderBookGraph constructs a new OrderBookGraph
func NewOrderBookGraph() *OrderBookGraph {
return &OrderBookGraph{
graph := &OrderBookGraph{
edgesForSellingAsset: map[string]edgeSet{},
tradingPairForOffer: map[xdr.Int64]tradingPair{},
}

graph.batchedUpdates = graph.Batch()
return graph
}

// AddOffer will queue an operation to add the given offer to the order book in
// the internal batch.
// You need to run Apply() to apply all enqueued operations.
func (graph *OrderBookGraph) AddOffer(offer xdr.OfferEntry) BatchedUpdates {
graph.batchedUpdates.AddOffer(offer)
return graph
}

// RemoveOffer will queue an operation to remove the given offer from the order book in
// the internal batch.
// You need to run Apply() to apply all enqueued operations.
func (graph *OrderBookGraph) RemoveOffer(offerID xdr.Int64) BatchedUpdates {
graph.batchedUpdates.RemoveOffer(offerID)
return graph
}

// Apply will attempt to apply all the updates in the internal batch to the order book.
// When Apply is successful, a new empty, instance of internal batch will be created.
func (graph *OrderBookGraph) Apply() {
graph.batchedUpdates.Apply()
graph.batchedUpdates = graph.Batch()
}

// Batch creates a new batch of order book updates which can be applied
Expand Down
112 changes: 0 additions & 112 deletions exp/orderbook/stream.go

This file was deleted.

8 changes: 5 additions & 3 deletions exp/support/pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type PipelineInterface interface {
AddPreProcessingHook(hook func(context.Context) error)
AddPostProcessingHook(hook func(context.Context, error) error)
Shutdown()
PrintStatus()
}

var _ PipelineInterface = &Pipeline{}
Expand Down Expand Up @@ -173,9 +174,10 @@ type Processor interface {
// Returns processor name. Helpful for errors, debuging and reports.
Name() string
// Reset resets internal state of the processor. This is run by the pipeline
// everytime the processing is done. It is extremely important to implement
// this method, otherwise internal state of the processor will be maintained
// between pipeline runs and may result in invalid data.
// everytime before the pipeline starts running.
// It is extremely important to implement this method, otherwise internal
// state of the processor will be maintained between pipeline runs and may
// result in an invalid data.
Reset()
}

Expand Down
15 changes: 8 additions & 7 deletions exp/support/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,24 +209,25 @@ func (p *Pipeline) processStateNode(ctx context.Context, store *Store, node *Pip
go func() {
wg.Wait()
finishUpdatingStats <- true

select {
case <-ctx.Done():
if p.cancelledWithErr != nil {
errorChan <- nil
}
// else: Do nothing, err already sent to a channel...
default:
if node == p.root {
err := p.sendPostProcessingHooks(readCloser.GetContext())
if err != nil {
errorChan <- errors.Wrap(err, "Error running pre-hook")
break
}
}
errorChan <- nil
}

// If all of the children of the current node are done and the current node is root
// it means that pipeline is done too.
if node == p.root {
err := p.sendPostProcessingHooks(readCloser.GetContext())
if err != nil {
errorChan <- errors.Wrap(err, "Error running pre-hook")
}

p.setRunning(false)
}
}()
Expand Down
Loading

0 comments on commit 2371386

Please sign in to comment.