Skip to content

Commit

Permalink
add persisted watcher library (#4307)
Browse files Browse the repository at this point in the history
# Watcher Library

## Overview

The Watcher Library is an internal component of the Bacalhau project
that provides a robust event watching and processing system. It's
designed to efficiently store, retrieve, and process events. The library
ensures events are stored in a durable, ordered manner, allowing for
consistent and reliable event processing. It supports features like
checkpointing, filtering, and long-polling, while maintaining the
ability to replay events from any point in the event history.


## Key Features

1. **Ordered Event Processing**: Events are processed in the exact order
they were created, ensuring consistency and predictability in event
handling.
2. **Durability**: Events are stored persistently in BoltDB, ensuring
they survive system restarts or crashes.
3. **Replayability**: The system allows replaying events from any point
in history, facilitating data recovery, debugging, and system
reconciliation.
4. **Concurrency**: Multiple watchers can process events concurrently,
improving system throughput.
5. **Filtering**: Watchers can filter events based on object types and
operations, allowing for targeted event processing.
6. **Checkpointing**: Watchers can save their progress and resume from
where they left off, enhancing reliability and efficiency.
7. **Long-polling**: Efficient event retrieval with support for
long-polling, reducing unnecessary network traffic and database queries.
8. **Garbage Collection**: Automatic cleanup of old events to manage
storage while maintaining the ability to replay from critical points.
9. **Flexible Event Iteration**: Different types of iterators for
various use cases, including the ability to start from the oldest event,
the latest event, or any specific point in the event history.


## Key Components

1. **Registry**: Manages multiple watchers and provides methods to
create and manage watchers.
2. **Watcher**: Represents a single event watcher that processes events
sequentially.
3. **EventStore**: Responsible for storing and retrieving events, with
BoltDB as the default implementation.
4. **EventHandler**: Interface for handling individual events.
5. **Serializer**: Handles the serialization and deserialization of
events.

## Core Concepts

### Event

An `Event` represents a single occurrence in the system. It has the
following properties:

- `SeqNum`: A unique, sequential identifier for the event.
- `Operation`: The type of operation (Create, Update, Delete).
- `ObjectType`: The type of object the event relates to.
- `Object`: The actual data associated with the event.
- `Timestamp`: When the event occurred.

### EventStore

The `EventStore` is responsible for persisting events and providing
methods to retrieve them. It uses BoltDB as the underlying storage
engine and supports features like caching, checkpointing, and garbage
collection.

### Registry

The `Registry` manages multiple watchers. It's the main entry point for
components that want to subscribe to events.

### Watcher

A `Watcher` represents a single subscriber to events. It processes
events sequentially and can be configured with filters and checkpoints.

### EventIterator

An `EventIterator` defines the starting position for reading events.
There are four types of iterators:

1. **TrimHorizonIterator**: Starts from the oldest available event.
2. **LatestIterator**: Starts from the latest available event.
3. **AtSequenceNumberIterator**: Starts at a specific sequence number.
4. **AfterSequenceNumberIterator**: Starts after a specific sequence
number.

## Usage

Here's how you typically use the Watcher library within Bacalhau:

1. Create an EventStore:

```go
db, _ := bbolt.Open("events.db", 0600, nil)
store, _ := boltdb.NewEventStore(db)
```

2. Create a Registry:
```go
registry := watcher.NewRegistry(store)
```

3. Implement an EventHandler:
```go
type MyHandler struct{}

func (h *MyHandler) HandleEvent(ctx context.Context, event watcher.Event) error {
    // Process the event
    return nil
}
```


4. Start watching for events:
```go
watcher, _ := registry.Watch(ctx, "my-watcher", &MyHandler{}, 
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
)
```

5. Store events:
```go
store.StoreEvent(ctx, watcher.OperationCreate, "Job", jobData)
```


## Configuration

### Watch Configuration

When creating a watcher, you can configure it with various options:

- `WithInitialEventIterator(iterator EventIterator)`: Sets the starting
position for watching if no checkpoint is found.
- `WithFilter(filter EventFilter)`: Sets the event filter for watching.
- `WithBufferSize(size int)`: Sets the size of the event buffer.
- `WithBatchSize(size int)`: Sets the number of events to fetch in each
batch.
- `WithInitialBackoff(backoff time.Duration)`: Sets the initial backoff
duration for retries.
- `WithMaxBackoff(backoff time.Duration)`: Sets the maximum backoff
duration for retries.
- `WithMaxRetries(maxRetries int)`: Sets the maximum number of retries
for event handling.
- `WithRetryStrategy(strategy RetryStrategy)`: Sets the retry strategy
for event handling.

Example:

```go
watcher, err := registry.Watch(ctx, "my-watcher", &MyHandler{},
    watcher.WithInitialEventIterator(watcher.TrimHorizonIterator()),
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
    watcher.WithBufferSize(1000),
    watcher.WithBatchSize(100),
    watcher.WithMaxRetries(3),
    watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
)
```

### EventStore Configuration (BoltDB)

The BoltDB EventStore can be configured with various options:

- `WithEventsBucket(name string)`: Sets the name of the bucket used to
store events.
- `WithCheckpointBucket(name string)`: Sets the name of the bucket used
to store checkpoints.
- `WithEventSerializer(serializer watcher.Serializer)`: Sets the
serializer used for events.
- `WithCacheSize(size int)`: Sets the size of the LRU cache used to
store events.
- `WithLongPollingTimeout(timeout time.Duration)`: Sets the timeout
duration for long-polling requests.
- `WithGCAgeThreshold(threshold time.Duration)`: Sets the age threshold
for event pruning.
- `WithGCCadence(cadence time.Duration)`: Sets the interval at which
garbage collection runs.
- `WithGCMaxRecordsPerRun(max int)`: Sets the maximum number of records
to process in a single GC run.
- `WithGCMaxDuration(duration time.Duration)`: Sets the maximum duration
for a single GC run.

Example:

```go
store, err := boltdb.NewEventStore(db,
    boltdb.WithEventsBucket("myEvents"),
    boltdb.WithCheckpointBucket("myCheckpoints"),
    boltdb.WithCacheSize(1000),
    boltdb.WithLongPollingTimeout(10*time.Second),
)
```


## Best Practices

1. Use meaningful watcher IDs to easily identify different components
subscribing to events.
2. Implement error handling in your `EventHandler` to ensure robust
event processing.
3. Use appropriate filters to minimize unnecessary event processing.
4. Regularly checkpoint your watchers to enable efficient restarts.
5. Monitor watcher stats to ensure they're keeping up with event volume.

## Troubleshooting

1. If a watcher is falling behind, consider increasing the batch size or
optimizing the event handling logic.
2. For performance issues, check the BoltDB file size and consider
tuning the garbage collection parameters.


## Future Improvements
1. Enhanced monitoring and metrics.
  • Loading branch information
wdbaruni authored Aug 8, 2024
1 parent f4bef24 commit f82ca46
Show file tree
Hide file tree
Showing 32 changed files with 4,082 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
Expand Down
49 changes: 40 additions & 9 deletions pkg/compute/store/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package boltdb

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
"sync"
"time"

"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/lib/marshaller"
"github.com/rs/zerolog/log"
bolt "go.etcd.io/bbolt"

"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/lib/marshaller"
"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
boltdb_watcher "github.com/bacalhau-project/bacalhau/pkg/lib/watcher/boltdb"
)

const (
Expand All @@ -22,6 +27,9 @@ const (
BucketHistoryName = "execution-history"
BucketJobIndexName = "execution-index"
BucketLiveIndexName = "execution-live-index"

BucketEventsName = "events"
BucketCheckpointsName = "checkpoints"
)

// Store represents an execution store that is backed by a boltdb database
Expand Down Expand Up @@ -69,6 +77,7 @@ type Store struct {

starting sync.WaitGroup
stateCounter *StateCounter
eventStore *boltdb_watcher.EventStore
}

// NewStore creates a new store backed by a boltdb database at the
Expand All @@ -78,7 +87,7 @@ type Store struct {
// it would mean later transactions will fail unless they obtain their
// own reference to the bucket.
func NewStore(ctx context.Context, dbPath string) (*Store, error) {
store := &Store{
s := &Store{
marshaller: marshaller.NewJSONMarshaller(),
starting: sync.WaitGroup{},
stateCounter: NewStateCounter(),
Expand All @@ -87,14 +96,14 @@ func NewStore(ctx context.Context, dbPath string) (*Store, error) {

database, err := GetDatabase(dbPath)
if err != nil {
if err == bolt.ErrTimeout {
if errors.Is(err, bolt.ErrTimeout) {
return nil, fmt.Errorf("timed out while opening database, is file %q in use?", dbPath)
}
return nil, err
}

store.database = database
err = store.database.Update(func(tx *bolt.Tx) error {
s.database = database
err = s.database.Update(func(tx *bolt.Tx) error {
_, err = tx.CreateBucketIfNotExists([]byte(BucketExecutionsName))
if err != nil {
return err
Expand Down Expand Up @@ -122,11 +131,22 @@ func NewStore(ctx context.Context, dbPath string) (*Store, error) {
return nil, fmt.Errorf("error creating database structure: %s", err)
}

eventObjectSerializer := watcher.NewJSONSerializer()
err = eventObjectSerializer.RegisterType("LocalStateHistory", reflect.TypeOf(store.LocalStateHistory{}))
if err != nil {
return nil, fmt.Errorf("failed to register LocalStateHistory type: %w", err)
}
s.eventStore, err = boltdb_watcher.NewEventStore(database,
boltdb_watcher.WithEventsBucket(BucketEventsName),
boltdb_watcher.WithCheckpointBucket(BucketCheckpointsName),
boltdb_watcher.WithEventSerializer(eventObjectSerializer),
)

// Populate the state counter for the
store.starting.Add(1)
go store.populateStateCounter(ctx)
s.starting.Add(1)
go s.populateStateCounter(ctx)

return store, nil
return s, nil
}

// getExecutionsBucket helper gets a reference to the executions bucket within
Expand Down Expand Up @@ -337,6 +357,7 @@ func (s *Store) CreateExecution(ctx context.Context, localExecutionState store.L
// and we won't rollback
s.stateCounter.IncrementState(localExecutionState.State, 1)
}

return
})
}
Expand Down Expand Up @@ -492,6 +513,11 @@ func (s *Store) appendHistory(
return err
}
seq := fmt.Sprintf("%03d", seqNum)

err = s.eventStore.StoreEventTx(tx, watcher.OperationCreate, "LocalStateHistory", historyEntry)
if err != nil {
return err
}
return executionHistoryBucket.Put([]byte(seq), historyEntryData)
}

Expand Down Expand Up @@ -540,6 +566,11 @@ func (s *Store) deleteExecution(tx *bolt.Tx, executionID string) error {
return nil
}

// GetEventStore returns the event store for the execution store
func (s *Store) GetEventStore() watcher.EventStore {
return s.eventStore
}

// Close ensures the database is closed cleanly
func (s *Store) Close(ctx context.Context) error {
return s.database.Close()
Expand Down
3 changes: 3 additions & 0 deletions pkg/compute/store/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/watcher"
"github.com/bacalhau-project/bacalhau/pkg/models"
)

Expand Down Expand Up @@ -120,6 +121,8 @@ type ExecutionStore interface {
// GetExecutionCount returns a count of all executions that are in the specified
// state
GetExecutionCount(ctx context.Context, state LocalExecutionStateType) (uint64, error)
// GetEventStore returns the event store for the execution store
GetEventStore() watcher.EventStore
// Close provides the opportunity for the underlying store to cleanup
// any resources as the compute node is shutting down
Close(ctx context.Context) error
Expand Down
177 changes: 177 additions & 0 deletions pkg/lib/watcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Watcher Library

## Overview

The Watcher Library is an internal component of the Bacalhau project that provides a robust event watching and processing system. It's designed to efficiently store, retrieve, and process events. The library ensures events are stored in a durable, ordered manner, allowing for consistent and reliable event processing. It supports features like checkpointing, filtering, and long-polling, while maintaining the ability to replay events from any point in the event history.


## Key Features

1. **Ordered Event Processing**: Events are processed in the exact order they were created, ensuring consistency and predictability in event handling.
2. **Durability**: Events are stored persistently in BoltDB, ensuring they survive system restarts or crashes.
3. **Replayability**: The system allows replaying events from any point in history, facilitating data recovery, debugging, and system reconciliation.
4. **Concurrency**: Multiple watchers can process events concurrently, improving system throughput.
5. **Filtering**: Watchers can filter events based on object types and operations, allowing for targeted event processing.
6. **Checkpointing**: Watchers can save their progress and resume from where they left off, enhancing reliability and efficiency.
7. **Long-polling**: Efficient event retrieval with support for long-polling, reducing unnecessary network traffic and database queries.
8. **Garbage Collection**: Automatic cleanup of old events to manage storage while maintaining the ability to replay from critical points.
9. **Flexible Event Iteration**: Different types of iterators for various use cases, including the ability to start from the oldest event, the latest event, or any specific point in the event history.


## Key Components

1. **Registry**: Manages multiple watchers and provides methods to create and manage watchers.
2. **Watcher**: Represents a single event watcher that processes events sequentially.
3. **EventStore**: Responsible for storing and retrieving events, with BoltDB as the default implementation.
4. **EventHandler**: Interface for handling individual events.
5. **Serializer**: Handles the serialization and deserialization of events.

## Core Concepts

### Event

An `Event` represents a single occurrence in the system. It has the following properties:

- `SeqNum`: A unique, sequential identifier for the event.
- `Operation`: The type of operation (Create, Update, Delete).
- `ObjectType`: The type of object the event relates to.
- `Object`: The actual data associated with the event.
- `Timestamp`: When the event occurred.

### EventStore

The `EventStore` is responsible for persisting events and providing methods to retrieve them. It uses BoltDB as the underlying storage engine and supports features like caching, checkpointing, and garbage collection.

### Registry

The `Registry` manages multiple watchers. It's the main entry point for components that want to subscribe to events.

### Watcher

A `Watcher` represents a single subscriber to events. It processes events sequentially and can be configured with filters and checkpoints.

### EventIterator

An `EventIterator` defines the starting position for reading events. There are four types of iterators:

1. **TrimHorizonIterator**: Starts from the oldest available event.
2. **LatestIterator**: Starts from the latest available event.
3. **AtSequenceNumberIterator**: Starts at a specific sequence number.
4. **AfterSequenceNumberIterator**: Starts after a specific sequence number.

## Usage

Here's how you typically use the Watcher library within Bacalhau:

1. Create an EventStore:

```go
db, _ := bbolt.Open("events.db", 0600, nil)
store, _ := boltdb.NewEventStore(db)
```

2. Create a Registry:
```go
registry := watcher.NewRegistry(store)
```

3. Implement an EventHandler:
```go
type MyHandler struct{}

func (h *MyHandler) HandleEvent(ctx context.Context, event watcher.Event) error {
// Process the event
return nil
}
```


4. Start watching for events:
```go
watcher, _ := registry.Watch(ctx, "my-watcher", &MyHandler{},
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{"Job", "Execution"},
Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
}),
)
```

5. Store events:
```go
store.StoreEvent(ctx, watcher.OperationCreate, "Job", jobData)
```


## Configuration

### Watch Configuration

When creating a watcher, you can configure it with various options:

- `WithInitialEventIterator(iterator EventIterator)`: Sets the starting position for watching if no checkpoint is found.
- `WithFilter(filter EventFilter)`: Sets the event filter for watching.
- `WithBufferSize(size int)`: Sets the size of the event buffer.
- `WithBatchSize(size int)`: Sets the number of events to fetch in each batch.
- `WithInitialBackoff(backoff time.Duration)`: Sets the initial backoff duration for retries.
- `WithMaxBackoff(backoff time.Duration)`: Sets the maximum backoff duration for retries.
- `WithMaxRetries(maxRetries int)`: Sets the maximum number of retries for event handling.
- `WithRetryStrategy(strategy RetryStrategy)`: Sets the retry strategy for event handling.

Example:

```go
watcher, err := registry.Watch(ctx, "my-watcher", &MyHandler{},
watcher.WithInitialEventIterator(watcher.TrimHorizonIterator()),
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{"Job", "Execution"},
Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
}),
watcher.WithBufferSize(1000),
watcher.WithBatchSize(100),
watcher.WithMaxRetries(3),
watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
)
```

### EventStore Configuration (BoltDB)

The BoltDB EventStore can be configured with various options:

- `WithEventsBucket(name string)`: Sets the name of the bucket used to store events.
- `WithCheckpointBucket(name string)`: Sets the name of the bucket used to store checkpoints.
- `WithEventSerializer(serializer watcher.Serializer)`: Sets the serializer used for events.
- `WithCacheSize(size int)`: Sets the size of the LRU cache used to store events.
- `WithLongPollingTimeout(timeout time.Duration)`: Sets the timeout duration for long-polling requests.
- `WithGCAgeThreshold(threshold time.Duration)`: Sets the age threshold for event pruning.
- `WithGCCadence(cadence time.Duration)`: Sets the interval at which garbage collection runs.
- `WithGCMaxRecordsPerRun(max int)`: Sets the maximum number of records to process in a single GC run.
- `WithGCMaxDuration(duration time.Duration)`: Sets the maximum duration for a single GC run.

Example:

```go
store, err := boltdb.NewEventStore(db,
boltdb.WithEventsBucket("myEvents"),
boltdb.WithCheckpointBucket("myCheckpoints"),
boltdb.WithCacheSize(1000),
boltdb.WithLongPollingTimeout(10*time.Second),
)
```


## Best Practices

1. Use meaningful watcher IDs to easily identify different components subscribing to events.
2. Implement error handling in your `EventHandler` to ensure robust event processing.
3. Use appropriate filters to minimize unnecessary event processing.
4. Regularly checkpoint your watchers to enable efficient restarts.
5. Monitor watcher stats to ensure they're keeping up with event volume.

## Troubleshooting

1. If a watcher is falling behind, consider increasing the batch size or optimizing the event handling logic.
2. For performance issues, check the BoltDB file size and consider tuning the garbage collection parameters.


## Future Improvements
1. Enhanced monitoring and metrics.
Loading

0 comments on commit f82ca46

Please sign in to comment.