Skip to content

Commit

Permalink
Add some documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
turt2live committed Mar 15, 2024
1 parent 68e9c62 commit 857b8ac
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 21 deletions.
112 changes: 108 additions & 4 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"time"
)

// ErrBucketFull represents an error indicating that a bucket is full or would overflow.
var ErrBucketFull = errors.New("leaky: bucket full or would overflow")

// Bucket represents a leaky bucket implementation for rate limiting or throttling.
type Bucket struct {
DrainBy int64
DrainInterval time.Duration
Expand All @@ -21,6 +23,23 @@ type Bucket struct {
lock sync.Mutex
}

// NewBucket creates a new Bucket with the given drainBy, drainEvery, and capacity parameters.
// It returns an error if any of the parameters are invalid.
//
// Example usage:
//
// bucket, err := NewBucket(5, 1 * time.Minute, 300)
//
// Parameters:
//
// drainBy - the amount to drain the bucket by each drain interval
// drainEvery - the duration between each drain interval
// capacity - the maximum capacity the bucket can hold
//
// Return values:
//
// *Bucket - the created Bucket instance
// error - error message if any of the parameters are invalid
func NewBucket(drainBy int64, drainEvery time.Duration, capacity int64) (*Bucket, error) {
if drainBy <= 0 || drainEvery <= 0 {
return nil, errors.New("leaky: bucket never drains")
Expand All @@ -38,6 +57,27 @@ func NewBucket(drainBy int64, drainEvery time.Duration, capacity int64) (*Bucket
}, nil
}

// DecodeBucket produces a Bucket from a previous Encode operation.
// It returns an error if any read operation fails. Read operations are performed sequentially rather
// than atomically. If an error occurs, partial data may remain on the reader.
//
// Example usage:
//
// buf := bytes.NewBuffer(myEncodedData)
// if bucket, err := leaky.DecodeBucket(buf); err != nil {
// log.Fatal(err)
// } else {
// // Use the decoded `bucket`
// }
//
// Parameters:
//
// r - an io.Reader interface from which the binary data will be read
//
// Return values:
//
// *Bucket - the Bucket instance decoded from the binary data in r
// error - error message if any errors occurred during reading or decoding
func DecodeBucket(r io.Reader) (*Bucket, error) {
bucket := &Bucket{}

Expand Down Expand Up @@ -83,6 +123,25 @@ func DecodeBucket(r io.Reader) (*Bucket, error) {
return bucket, nil
}

// Encode writes the bucket's state to the provided io.Writer.
// It returns an error if any writing operation fails. Write operations are performed sequentially rather
// than atomically. If an error occurs, partial data may be written to the writer.
//
// Example usage:
//
// buf := &bytes.Buffer{}
// if err := bucket.Encode(buf); err != nil {
// log.Fatal(err)
// }
// // Use the encoded data stored in `buf` as needed.
//
// Parameters:
//
// w - an io.Writer interface to which the binary data will be written
//
// Return values:
//
// error - error message if any errors occurred during writing or encoding
func (b *Bucket) Encode(w io.Writer) error {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down Expand Up @@ -119,6 +178,20 @@ func (b *Bucket) Encode(w io.Writer) error {
return nil
}

// drain updates the value of the bucket by subtracting the drained amount based on the elapsed time since the last drain.
// If the bucket is already empty, it does nothing.
//
// If the bucket has never been drained before, it sets the last drain time as the current time.
// If the bucket value is zero or negative after the drain, it sets the value to zero and updates the last drain time.
//
// The elapsed time since the last drain is calculated by subtracting the last drain time from the current time.
// The elapsed time is truncated to the nearest multiple of the drain interval.
// The number of leaks is then calculated by dividing the elapsed time by the drain interval.
// The drained amount is calculated by multiplying the drain by the number of leaks.
// The bucket value is updated by subtracting the drained amount.
// If the bucket value becomes negative, it is set to zero.
//
// Finally, the last drain time is updated to the current time minus the remaining elapsed time (since - drainTime).
func (b *Bucket) drain() {
b.lock.Lock()
defer b.lock.Unlock()
Expand All @@ -143,20 +216,42 @@ func (b *Bucket) drain() {
b.lastDrain = time.Now().Add((since - drainTime) * -1)
}

// Peek returns the current value of the bucket without performing any drain.
func (b *Bucket) Peek() int64 {
return b.value
}

// Value returns the current value of the bucket after performing a drain operation.
func (b *Bucket) Value() int64 {
b.drain()
return b.value
}

// Remaining returns the remaining capacity of the Bucket.
// It first applies the drain operation to update the bucket's internal value.
// The remaining capacity is calculated by subtracting the current value from the Capacity.
// This method does not modify the bucket's internal value.
//
// Returns the remaining capacity as an int64 value.
func (b *Bucket) Remaining() int64 {
b.drain()
return b.Capacity - b.value
}

// Add increments the value of the Bucket by the specified amount.
// If the new value would exceed Capacity, ErrBucketFull is returned without modifying the bucket's
// internal value. Otherwise, the amount is added to the bucket atomically. In either case, a drain
// operation is performed before checking the capacity.
//
// Returns nil if successful.
//
// Parameters:
//
// amount - the amount by which the bucket's value will be incremented
//
// Return values:
//
// error - ErrBucketFull if the new value would exceed the capacity, otherwise nil
func (b *Bucket) Add(amount int64) error {
b.drain()

Expand All @@ -171,7 +266,18 @@ func (b *Bucket) Add(amount int64) error {
return nil
}

func (b *Bucket) Set(value int64, resetDrain bool) error {
// Set sets the value of the Bucket.
// The value must be positive or zero, and within capacity for the bucket. An error is returned otherwise.
// This is an atomic operation, and resets the drain time.
//
// Parameters:
//
// value - the value to set the bucket to
//
// Return values:
//
// error - error message if the value is invalid
func (b *Bucket) Set(value int64) error {
if value < 0 {
return errors.New("leaky: bucket value cannot be negative")
}
Expand All @@ -183,8 +289,6 @@ func (b *Bucket) Set(value int64, resetDrain bool) error {
defer b.lock.Unlock()

b.value = value
if resetDrain {
b.lastDrain = time.Now()
}
b.lastDrain = time.Now()
return nil
}
17 changes: 4 additions & 13 deletions bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,42 +354,33 @@ func TestBucket_Set(t *testing.T) {
}

// Must be positive value
if err = bucket.Set(-1, true); err != nil {
if err = bucket.Set(-1); err != nil {
assert.EqualErrorf(t, err, "leaky: bucket value cannot be negative", "TestBucket_Set(case:%d)", i)
} else {
t.Errorf("TestBucket_Set(case:%d): expected error, got nil", i)
}

// Must be less than capacity
if err = bucket.Set(bucket.Capacity+1, true); err != nil {
if err = bucket.Set(bucket.Capacity + 1); err != nil {
assert.EqualErrorf(t, err, "leaky: bucket value cannot exceed capacity", "TestBucket_Set(case:%d)", i)
} else {
t.Errorf("TestBucket_Set(case:%d): expected error, got nil", i)
}

// Can be zero, and resets lastDrain, and doesn't drain
bucket.lastDrain = time.Now().Add(-5 * bucket.DrainInterval)
if err = bucket.Set(0, true); err != nil {
if err = bucket.Set(0); err != nil {
t.Errorf("TestBucket_Set(case:%d): unexpected Set error %v", i, err)
}
assert.Equal(t, int64(0), bucket.value)
assert.InDeltaf(t, 0*time.Millisecond, time.Since(bucket.lastDrain), float64(10*time.Millisecond), "TestBucket_Set(case:%d)", i)

// Can be positive, and resets lastDrain, and doesn't drain
bucket.lastDrain = time.Now().Add(-5 * bucket.DrainInterval)
if err = bucket.Set(5, true); err != nil {
if err = bucket.Set(5); err != nil {
t.Errorf("TestBucket_Set(case:%d): unexpected Set error %v", i, err)
}
assert.Equal(t, int64(5), bucket.value)
assert.InDeltaf(t, 0*time.Millisecond, time.Since(bucket.lastDrain), float64(10*time.Millisecond), "TestBucket_Set(case:%d)", i)

// Doesn't reset lastDrain when resetDrain=false
drainTime := time.Now().Add(-5 * bucket.DrainInterval)
bucket.lastDrain = drainTime
if err = bucket.Set(10, false); err != nil {
t.Errorf("TestBucket_Set(case:%d): unexpected Set error %v", i, err)
}
assert.Equalf(t, int64(10), bucket.value, "TestBucket_Set(case:%d) should be equal", i)
assert.Equalf(t, drainTime, bucket.lastDrain, "TestBucket_Set(case:%d) should be equal", i)
}
}
4 changes: 1 addition & 3 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ func main() {
fmt.Println("Undrained size:", bucket.Peek())

// Force the bucket to have a particular size
if err = bucket.Set(42 /*resetDrain=*/, true); err != nil {
if err = bucket.Set(42); err != nil {
panic(err) // TODO: Handle error
} else {
// The bucket is now set to 42, and the drain has been reset. Calling `.Value()` or similar
// right now would not cause the size to decrease.
//
// If for some reason you'd like to leave the drain status unchanged, set resetDrain to false.
fmt.Println("Size after Set:", bucket.Value()) // will not drain, even if a minute had passed
}

Expand Down
2 changes: 1 addition & 1 deletion examples/serialize/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
}

// Set a value so we can compare later
if err = bucket.Set(42 /*resetDrain=*/, true); err != nil {
if err = bucket.Set(42); err != nil {
panic(err) // TODO: Handle error
}

Expand Down

0 comments on commit 857b8ac

Please sign in to comment.