Skip to content

Commit

Permalink
Copy of storage extension from contrib
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed May 24, 2021
1 parent 4c68d9d commit 10c5218
Show file tree
Hide file tree
Showing 26 changed files with 1,252 additions and 3 deletions.
1 change: 1 addition & 0 deletions extension/storage/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
16 changes: 16 additions & 0 deletions extension/storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Storage

A storage extension persists state beyond the collector process. Other components can request a storage client from the storage extension and use it to manage state.

The `storage.Extension` interface extends `component.Extension` by adding the following method:
```
GetClient(component.Kind, component.Kind, config.ComponentID) (Client, error)
```

The `storage.Client` interface contains the following methods:
```
Get(string) ([]byte, error)
Set(string, []byte) error
Delete(string) error
```
Note: All methods should return error only if a problem occurred. (For example, if a file is no longer accessible, or if a remote service is unavailable.)
17 changes: 17 additions & 0 deletions extension/storage/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package stanzareceiver implements a receiver that can be used by the
// Opentelemetry collector to receive logs using the stanza log agent
package storage
1 change: 1 addition & 0 deletions extension/storage/filestorage/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
36 changes: 36 additions & 0 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# File Storage

> :construction: This extension is in alpha. Configuration and functionality are subject to change.
The File Storage extension can persist state to the local file system.

The extension requires read and write access to a directory. A default directory can be used, but it must already exist in order for the extension to operate.

`directory` is the relative or absolute path to the dedicated data storage directory.

`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.


```
extensions:
file_storage:
file_storage/all_settings:
directory: /var/lib/otelcol/mydir
timeout: 1s
service:
extensions: [file_storage, file_storage/all_settings]
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [nop]
# Data pipeline is required to load the config.
receivers:
nop:
processors:
nop:
exporters:
nop:
```
99 changes: 99 additions & 0 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"context"
"errors"
"time"

"go.etcd.io/bbolt"
)

var defaultBucket = []byte(`default`)

type fileStorageClient struct {
db *bbolt.DB
}

func newClient(filePath string, timeout time.Duration) (*fileStorageClient, error) {
options := &bbolt.Options{
Timeout: timeout,
NoSync: true,
}
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
return nil, err
}

initBucket := func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(defaultBucket)
return err
}
if err := db.Update(initBucket); err != nil {
return nil, err
}

return &fileStorageClient{db}, nil
}

// Get will retrieve data from storage that corresponds to the specified key
func (c *fileStorageClient) Get(_ context.Context, key string) ([]byte, error) {
var result []byte
get := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
result = bucket.Get([]byte(key))
return nil // no error
}

if err := c.db.Update(get); err != nil {
return nil, err
}
return result, nil
}

// Set will store data. The data can be retrieved using the same key
func (c *fileStorageClient) Set(_ context.Context, key string, value []byte) error {
set := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
return bucket.Put([]byte(key), value)
}

return c.db.Update(set)
}

// Delete will delete data associated with the specified key
func (c *fileStorageClient) Delete(_ context.Context, key string) error {
delete := func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucket)
if bucket == nil {
return errors.New("storage not initialized")
}
return bucket.Delete([]byte(key))
}

return c.db.Update(delete)
}

// Close will close the database
func (c *fileStorageClient) close() error {
return c.db.Close()
}
194 changes: 194 additions & 0 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filestorage

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)

func TestClientOperations(t *testing.T) {
tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(t, err)

ctx := context.Background()
testKey := "testKey"
testValue := []byte("testValue")

// Make sure nothing is there
value, err := client.Get(ctx, testKey)
require.NoError(t, err)
require.Nil(t, value)

// Set it
err = client.Set(ctx, testKey, testValue)
require.NoError(t, err)

// Get it back out, make sure it's right
value, err = client.Get(ctx, testKey)
require.NoError(t, err)
require.Equal(t, testValue, value)

// Delete it
err = client.Delete(ctx, testKey)
require.NoError(t, err)

// Make sure it's gone
value, err = client.Get(ctx, testKey)
require.NoError(t, err)
require.Nil(t, value)
}

func TestNewClientTransactionErrors(t *testing.T) {
timeout := 100 * time.Millisecond

testKey := "testKey"
testValue := []byte("testValue")

testCases := []struct {
name string
setup func(*bbolt.Tx) error
validate func(*testing.T, *fileStorageClient)
}{
{
name: "get",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
value, err := c.Get(context.Background(), testKey)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
require.Nil(t, value)
},
},
{
name: "set",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
err := c.Set(context.Background(), testKey, testValue)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
},
},
{
name: "delete",
setup: func(tx *bbolt.Tx) error {
return tx.DeleteBucket(defaultBucket)
},
validate: func(t *testing.T, c *fileStorageClient) {
err := c.Delete(context.Background(), testKey)
require.Error(t, err)
require.Equal(t, "storage not initialized", err.Error())
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, timeout)
require.NoError(t, err)

// Create a problem
client.db.Update(tc.setup)

// Validate expected behavior
tc.validate(t, client)
})
}
}

func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
temp := defaultBucket
defaultBucket = nil

tempDir := newTempDir(t)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.Error(t, err)
require.Nil(t, client)

defaultBucket = temp
}

func BenchmarkClientGet(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"

for n := 0; n < b.N; n++ {
client.Get(ctx, testKey)
}
}

func BenchmarkClientSet(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"
testValue := []byte("testValue")

for n := 0; n < b.N; n++ {
client.Set(ctx, testKey, testValue)
}
}

func BenchmarkClientDelete(b *testing.B) {
tempDir := newTempDir(b)
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(dbFile, time.Second)
require.NoError(b, err)

ctx := context.Background()
testKey := "testKey"

for n := 0; n < b.N; n++ {
client.Delete(ctx, testKey)
}
}

func newTempDir(tb testing.TB) string {
tempDir, err := ioutil.TempDir("", "")
require.NoError(tb, err)
tb.Cleanup(func() { os.RemoveAll(tempDir) })
return tempDir
}
Loading

0 comments on commit 10c5218

Please sign in to comment.