From 424523c50cb893ae6048fe8a7b35c3486a4e5e07 Mon Sep 17 00:00:00 2001 From: Kent Rancourt Date: Fri, 17 Jun 2016 13:24:41 -0400 Subject: [PATCH] feat(storage): Add redis storage adapter --- Makefile | 36 ++++++++-- glide.lock | 17 +++-- glide.yaml | 1 + storage/factory.go | 8 +++ storage/factory_test.go | 12 ++++ storage/redis/adapter.go | 83 ++++++++++++++++++++++ storage/redis/adapter_test.go | 128 ++++++++++++++++++++++++++++++++++ storage/redis/config.go | 24 +++++++ 8 files changed, 297 insertions(+), 12 deletions(-) create mode 100644 storage/redis/adapter.go create mode 100644 storage/redis/adapter_test.go create mode 100644 storage/redis/config.go diff --git a/Makefile b/Makefile index b85e5a4..9b8cf24 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,9 @@ REPO_PATH = github.com/deis/logger # and other build options DEV_ENV_IMAGE := quay.io/deis/go-dev:0.13.0 DEV_ENV_WORK_DIR := /go/src/${REPO_PATH} -DEV_ENV_CMD := docker run --rm -v ${CURDIR}:${DEV_ENV_WORK_DIR} -w ${DEV_ENV_WORK_DIR} ${DEV_ENV_IMAGE} -DEV_ENV_CMD_INT := docker run -it --rm -v ${CURDIR}:${DEV_ENV_WORK_DIR} -w ${DEV_ENV_WORK_DIR} ${DEV_ENV_IMAGE} +DEV_ENV_OPTS := --rm -v ${CURDIR}:${DEV_ENV_WORK_DIR} -w ${DEV_ENV_WORK_DIR} +DEV_ENV_CMD := docker run ${DEV_ENV_OPTS} ${DEV_ENV_IMAGE} +DEV_ENV_CMD_INT := docker run -it ${DEV_ENV_OPTS} ${DEV_ENV_IMAGE} LDFLAGS := "-s -X main.version=${VERSION}" BINARY_DEST_DIR = rootfs/opt/logger/sbin @@ -29,6 +30,8 @@ IMAGE_PREFIX ?= deis include versioning.mk +REDIS_CONTAINER_NAME := test-redis-${VERSION} + SHELL_SCRIPTS = $(wildcard _scripts/*.sh) check-docker: @@ -75,8 +78,15 @@ update-manifests: test: test-style test-unit -test-cover: - ${DEV_ENV_CMD} test-cover.sh +test-cover: start-test-redis + docker run ${DEV_ENV_OPTS} \ + -it \ + --link ${REDIS_CONTAINER_NAME}:TEST_REDIS \ + ${DEV_ENV_IMAGE} bash -c 'DEIS_LOGGER_REDIS_SERVICE_HOST=$$TEST_REDIS_PORT_6379_TCP_ADDR \ + DEIS_LOGGER_REDIS_SERVICE_PORT=$$TEST_REDIS_PORT_6379_TCP_PORT \ + test-cover.sh' \ + || (make stop-test-redis && false) + make stop-test-redis test-style: check-docker ${DEV_ENV_CMD} make style-check @@ -89,8 +99,22 @@ style-check: $(GOLINT) ./... shellcheck $(SHELL_SCRIPTS) -test-unit: - ${DEV_ENV_CMD} $(GOTEST) $$(glide nv) +start-test-redis: + docker run --name ${REDIS_CONTAINER_NAME} -d redis:latest + +stop-test-redis: + docker kill ${REDIS_CONTAINER_NAME} + docker rm ${REDIS_CONTAINER_NAME} + +test-unit: start-test-redis + docker run ${DEV_ENV_OPTS} \ + -it \ + --link ${REDIS_CONTAINER_NAME}:TEST_REDIS \ + ${DEV_ENV_IMAGE} bash -c 'DEIS_LOGGER_REDIS_SERVICE_HOST=$$TEST_REDIS_PORT_6379_TCP_ADDR \ + DEIS_LOGGER_REDIS_SERVICE_PORT=$$TEST_REDIS_PORT_6379_TCP_PORT \ + $(GOTEST) $$(glide nv)' \ + || (make stop-test-redis && false) + make stop-test-redis kube-install: kubectl create -f manifests/deis-logger-svc.yaml diff --git a/glide.lock b/glide.lock index 2e530c7..9f3b0a8 100644 --- a/glide.lock +++ b/glide.lock @@ -1,10 +1,6 @@ -hash: 2e289330bc87345e9bd848b72552b65f7828a3cdc404f374bb8642abeddf3aad -updated: 2016-06-22T16:00:53.76132532Z +hash: 03d367b59b94b85b1eeb7acec7a679364921474ccc607e8d36a6558d94af5369 +updated: 2016-06-24T22:33:20.466933554Z imports: -- name: github.com/deis/logger - version: 3bf21b87c7e6ac521ab3eb05904dd147be4bfd8a - subpackages: - - osutils - name: github.com/gorilla/context version: aed02d124ae4a0e94fea4541c8effd05bf0c8296 - name: github.com/gorilla/mux @@ -17,4 +13,13 @@ imports: - snappy-go - name: github.com/nsqio/go-nsq version: d71fb89c9e0263aaed89645b0b168e11ccf597b1 +- name: gopkg.in/bsm/ratelimit.v1 + version: db14e161995a5177acef654cb0dd785e8ee8bc22 +- name: gopkg.in/redis.v3 + version: a905127dc89ca51a241eacf70757a40423ab93d0 + subpackages: + - internal + - internal/consistenthash + - internal/hashtag + - internal/pool devImports: [] diff --git a/glide.yaml b/glide.yaml index b24bdb0..6ddb606 100644 --- a/glide.yaml +++ b/glide.yaml @@ -4,3 +4,4 @@ import: - package: github.com/gorilla/context - package: github.com/kelseyhightower/envconfig - package: github.com/nsqio/go-nsq +- package: gopkg.in/redis.v3 diff --git a/storage/factory.go b/storage/factory.go index b0fde52..a76638e 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "github.com/deis/logger/storage/file" + "github.com/deis/logger/storage/redis" "github.com/deis/logger/storage/ringbuffer" ) @@ -23,5 +24,12 @@ func NewAdapter(storeageAdapterType string, numLines int) (Adapter, error) { } return adapter, nil } + if storeageAdapterType == "redis" { + adapter, err := redis.NewStorageAdapter(numLines) + if err != nil { + return nil, err + } + return adapter, nil + } return nil, fmt.Errorf("Unrecognized storage adapter type: '%s'", storeageAdapterType) } diff --git a/storage/factory_test.go b/storage/factory_test.go index ad25e5f..01e96a0 100644 --- a/storage/factory_test.go +++ b/storage/factory_test.go @@ -38,6 +38,18 @@ func TestGetMemoryBasedAdapter(t *testing.T) { } } +func TestGetRedisBasedAdapter(t *testing.T) { + a, err := NewAdapter("redis", 1) + if err != nil { + t.Error(err) + } + expected := "*redis.adapter" + aType := reflect.TypeOf(a).String() + if aType != expected { + t.Errorf("Expected a %s, but got a %s", expected, aType) + } +} + func TestMain(m *testing.M) { os.Exit(m.Run()) } diff --git a/storage/redis/adapter.go b/storage/redis/adapter.go new file mode 100644 index 0000000..071d53d --- /dev/null +++ b/storage/redis/adapter.go @@ -0,0 +1,83 @@ +package redis + +import ( + "fmt" + "log" + + r "gopkg.in/redis.v3" +) + +type adapter struct { + bufferSize int + redisClient *r.Client +} + +// NewStorageAdapter returns a pointer to a new instance of a redis-based storage.Adapter. +func NewStorageAdapter(bufferSize int) (*adapter, error) { + if bufferSize <= 0 { + return nil, fmt.Errorf("Invalid buffer size: %d", bufferSize) + } + cfg, err := parseConfig(appName) + if err != nil { + log.Fatalf("config error: %s: ", err) + } + if err != nil { + return nil, err + } + return &adapter{ + bufferSize: bufferSize, + redisClient: r.NewClient(&r.Options{ + Addr: fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort), + Password: cfg.RedisPassword, // "" == no password + DB: int64(cfg.RedisDB), + }), + }, nil +} + +// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics +func (a *adapter) Write(app string, message string) error { + // Note: Deliberately NOT using MULTI / transactions here since in this implementation of the + // redis client, MULTI is not safe for concurrent use by multiple goroutines. It's been advised + // by the authors of the gopkg.in/redis.v3 package to just use pipelining when possible... + // and here that is technically possible. In the WORST case scenario, not having transactions + // means we may momentarily have more than the desired number of log entries in the list / + // buffer, but an LTRIM will eventually correct that, bringing the list / buffer back down to + // its desired max size. + pipeline := a.redisClient.Pipeline() + if err := pipeline.RPush(app, message).Err(); err != nil { + return err + } + if err := pipeline.LTrim(app, int64(-1*a.bufferSize), -1).Err(); err != nil { + return err + } + if _, err := pipeline.Exec(); err != nil { + return err + } + return nil +} + +// Read retrieves a specified number of log lines from an app-specific list in redis +func (a *adapter) Read(app string, lines int) ([]string, error) { + stringSliceCmd := a.redisClient.LRange(app, int64(-1*lines), -1) + result, err := stringSliceCmd.Result() + if err != nil { + return nil, err + } + if len(result) > 0 { + return result, nil + } + return nil, fmt.Errorf("Could not find logs for '%s'", app) +} + +// Destroy deletes an app-specific list from redis +func (a *adapter) Destroy(app string) error { + if err := a.redisClient.Del(app).Err(); err != nil { + return err + } + return nil +} + +func (a *adapter) Reopen() error { + // No-op + return nil +} diff --git a/storage/redis/adapter_test.go b/storage/redis/adapter_test.go new file mode 100644 index 0000000..94e7f15 --- /dev/null +++ b/storage/redis/adapter_test.go @@ -0,0 +1,128 @@ +package redis + +import ( + "fmt" + "testing" +) + +const app string = "test-app" + +func TestReadFromNonExistingApp(t *testing.T) { + // Initialize a new storage adapter + a, err := NewStorageAdapter(10) + if err != nil { + t.Error(err) + } + // No logs have been written; there should be no redis list for app + messages, err := a.Read(app, 10) + if messages != nil { + t.Error("Expected no messages, but got some") + } + if err == nil || err.Error() != fmt.Sprintf("Could not find logs for '%s'", app) { + t.Error("Did not receive expected error message") + } +} + +func TestWithBadBufferSizes(t *testing.T) { + // Initialize with invalid buffer sizes + for _, size := range []int{-1, 0} { + a, err := NewStorageAdapter(size) + if a != nil { + t.Error("Expected no storage adapter, but got one") + } + if err == nil || err.Error() != fmt.Sprintf("Invalid buffer size: %d", size) { + t.Error("Did not receive expected error message") + } + } +} + +func TestLogs(t *testing.T) { + // Initialize with small buffers + a, err := NewStorageAdapter(10) + if err != nil { + t.Error(err) + } + // And write a few logs to it, but do NOT fill it up + for i := 0; i < 5; i++ { + if err := a.Write(app, fmt.Sprintf("message %d", i)); err != nil { + t.Error(err) + } + } + // Read more logs than there are + messages, err := a.Read(app, 8) + if err != nil { + t.Error(err) + } + // Should only get as many messages as we actually have + if len(messages) != 5 { + t.Errorf("only expected 5 log messages, got %d", len(messages)) + } + // Read fewer logs than there are + messages, err = a.Read(app, 3) + if err != nil { + t.Error(err) + } + // Should get the 3 MOST RECENT logs + if len(messages) != 3 { + t.Errorf("only expected 5 log messages, got %d", len(messages)) + } + for i := 0; i < 3; i++ { + expectedMessage := fmt.Sprintf("message %d", i+2) + if messages[i] != expectedMessage { + t.Errorf("expected: \"%s\", got \"%s\"", expectedMessage, messages[i]) + } + } + // Overfill the buffer + for i := 5; i < 11; i++ { + if err := a.Write(app, fmt.Sprintf("message %d", i)); err != nil { + t.Error(err) + } + } + // Read more logs than the buffer can hold + messages, err = a.Read(app, 20) + if err != nil { + t.Error(err) + } + // Should only get as many messages as the buffer can hold + if len(messages) != 10 { + t.Errorf("only expected 10 log messages, got %d", len(messages)) + } + // And they should only be the 10 MOST RECENT logs + for i := 0; i < 10; i++ { + expectedMessage := fmt.Sprintf("message %d", i+1) + if messages[i] != expectedMessage { + t.Errorf("expected: \"%s\", got \"%s\"", expectedMessage, messages[i]) + } + } +} + +func TestDestroy(t *testing.T) { + a, err := NewStorageAdapter(10) + if err != nil { + t.Error(err) + } + // Write a log to create the file + if err := a.Write(app, "Hello, log!"); err != nil { + t.Error(err) + } + // A redis list should exist for the app + exists, err := a.redisClient.Exists(app).Result() + if err != nil { + t.Error(err) + } + if !exists { + t.Error("Log redis list was expected to exist, but doesn't.") + } + // Now destroy it + if err := a.Destroy(app); err != nil { + t.Error(err) + } + // Now check that the redis list no longer exists + exists, err = a.redisClient.Exists(app).Result() + if err != nil { + t.Error(err) + } + if exists { + t.Error("Log redis list still exist, but was expected not to.") + } +} diff --git a/storage/redis/config.go b/storage/redis/config.go new file mode 100644 index 0000000..e2ba145 --- /dev/null +++ b/storage/redis/config.go @@ -0,0 +1,24 @@ +package redis + +import ( + "github.com/kelseyhightower/envconfig" +) + +const ( + appName = "logger" +) + +type config struct { + RedisHost string `envconfig:"DEIS_LOGGER_REDIS_SERVICE_HOST" default:""` + RedisPort int `envconfig:"DEIS_LOGGER_REDIS_SERVICE_PORT" default:"6379"` + RedisPassword string `envconfig:"DEIS_LOGGER_REDIS_PASSWORD" default:""` + RedisDB int `envconfig:"DEIS_LOGGER_REDIS_DB" default:"0"` +} + +func parseConfig(appName string) (*config, error) { + ret := new(config) + if err := envconfig.Process(appName, ret); err != nil { + return nil, err + } + return ret, nil +}