diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 10fcb08..a44eb3d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -24,7 +24,7 @@ deckard # Root folder with project files and Golang service/ │ ├── cmd # Executable main file for the Deckard service │ ├── config # Configuration variables managed by viper │ ├── logger # Logging configuration -│ ├── messagepool # Contains all main implementation files and housekeeping program logic +│ ├── queue # Contains all queue implementation files and housekeeping program logic │ │ ├── cache # Caching implementation │ │ ├── entities # Message and QueueConfiguration internal definitions │ │ ├── queue # Queue definition diff --git a/internal/audit/audit.go b/internal/audit/audit.go index f5dca33..4c59fa5 100644 --- a/internal/audit/audit.go +++ b/internal/audit/audit.go @@ -15,10 +15,10 @@ import ( "github.com/takenet/deckard/internal/config" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/utils" "github.com/takenet/deckard/internal/metrics" "github.com/takenet/deckard/internal/project" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/utils" "go.uber.org/zap" "github.com/elastic/go-elasticsearch/v7" diff --git a/internal/cmd/deckard/main.go b/internal/cmd/deckard/main.go index 9e13b3b..5980f12 100644 --- a/internal/cmd/deckard/main.go +++ b/internal/cmd/deckard/main.go @@ -11,12 +11,11 @@ import ( "github.com/takenet/deckard/internal/audit" "github.com/takenet/deckard/internal/config" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool" - "github.com/takenet/deckard/internal/messagepool/cache" - "github.com/takenet/deckard/internal/messagepool/queue" - "github.com/takenet/deckard/internal/messagepool/storage" - "github.com/takenet/deckard/internal/messagepool/utils" "github.com/takenet/deckard/internal/metrics" + "github.com/takenet/deckard/internal/queue" + "github.com/takenet/deckard/internal/queue/cache" + "github.com/takenet/deckard/internal/queue/storage" + "github.com/takenet/deckard/internal/queue/utils" "github.com/takenet/deckard/internal/service" "github.com/takenet/deckard/internal/shutdown" "github.com/takenet/deckard/internal/trace" @@ -88,16 +87,16 @@ func main() { } go auditor.StartSender(ctx) - queueService := queue.NewConfigurationService(ctx, dataStorage) + configurationService := queue.NewQueueConfigurationService(ctx, dataStorage) - messagePool := messagepool.NewMessagePool(auditor, dataStorage, queueService, dataCache) + queue := queue.NewQueue(auditor, dataStorage, configurationService, dataCache) if config.GrpcEnabled.GetBool() { - server = startGrpcServer(messagePool, queueService) + server = startGrpcServer(queue, configurationService) } if config.HousekeeperEnabled.GetBool() { - startHouseKeeperJobs(messagePool) + startHouseKeeperJobs(queue) } // Handle sigterm and await termChan signal @@ -113,8 +112,8 @@ func isMemoryInstance() bool { return config.CacheType.Get() == string(cache.MEMORY) && config.StorageType.Get() == string(storage.MEMORY) } -func startGrpcServer(messagepool *messagepool.MessagePool, queueService queue.ConfigurationService) *grpc.Server { - deckard := service.NewDeckardInstance(messagepool, queueService, isMemoryInstance()) +func startGrpcServer(queue *queue.Queue, queueService queue.QueueConfigurationService) *grpc.Server { + deckard := service.NewDeckardInstance(queue, queueService, isMemoryInstance()) server, err := deckard.ServeGRPCServer(ctx) if err != nil { @@ -125,14 +124,14 @@ func startGrpcServer(messagepool *messagepool.MessagePool, queueService queue.Co return server } -func startHouseKeeperJobs(pool *messagepool.MessagePool) { +func startHouseKeeperJobs(pool *queue.Queue) { go scheduleTask( UNLOCK, nil, shutdown.WaitGroup, config.HousekeeperTaskUnlockDelay.GetDuration(), func() bool { - messagepool.ProcessLockPool(ctx, pool) + queue.ProcessLockPool(ctx, pool) return true }, @@ -144,7 +143,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) { shutdown.WaitGroup, config.HousekeeperTaskTimeoutDelay.GetDuration(), func() bool { - _ = messagepool.ProcessTimeoutMessages(ctx, pool) + _ = queue.ProcessTimeoutMessages(ctx, pool) return true }, @@ -156,7 +155,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) { shutdown.WaitGroup, config.HousekeeperTaskMetricsDelay.GetDuration(), func() bool { - messagepool.ComputeMetrics(ctx, pool) + queue.ComputeMetrics(ctx, pool) return true }, @@ -168,7 +167,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) { shutdown.CriticalWaitGroup, config.HousekeeperTaskUpdateDelay.GetDuration(), func() bool { - return messagepool.RecoveryMessagesPool(ctx, pool) + return queue.RecoveryMessagesPool(ctx, pool) }, ) @@ -178,7 +177,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) { shutdown.WaitGroup, config.HousekeeperTaskMaxElementsDelay.GetDuration(), func() bool { - metrify, _ := messagepool.RemoveExceedingMessages(ctx, pool) + metrify, _ := queue.RemoveExceedingMessages(ctx, pool) return metrify }, @@ -192,7 +191,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) { func() bool { now := time.Now() - metrify, _ := messagepool.RemoveTTLMessages(ctx, pool, &now) + metrify, _ := queue.RemoveTTLMessages(ctx, pool, &now) return metrify }, diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 646ba0d..ebc12e8 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -13,8 +13,8 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/takenet/deckard/internal/config" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool/utils" "github.com/takenet/deckard/internal/project" + "github.com/takenet/deckard/internal/queue/utils" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" otelmetric "go.opentelemetry.io/otel/metric" @@ -32,7 +32,7 @@ var ( mutex = sync.Mutex{} meter otelmetric.Meter - MetricsMap *MessagePoolMetricsMap + MetricsMap *QueueMetricsMap // Keep tracking of features used by clients to be able to deprecate them CodeUsage instrument.Int64Counter @@ -48,12 +48,12 @@ var ( HousekeeperTotalElements instrument.Int64ObservableGauge // Message Pool - MessagePoolTimeout instrument.Int64Counter - MessagePoolAck instrument.Int64Counter - MessagePoolNack instrument.Int64Counter - MessagePoolEmptyQueue instrument.Int64Counter - MessagePoolEmptyQueueStorage instrument.Int64Counter - MessagePoolNotFoundInStorage instrument.Int64Counter + QueueTimeout instrument.Int64Counter + QueueAck instrument.Int64Counter + QueueNack instrument.Int64Counter + QueueEmptyQueue instrument.Int64Counter + QueueEmptyQueueStorage instrument.Int64Counter + QueueNotFoundInStorage instrument.Int64Counter // Storage StorageLatency instrument.Int64Histogram @@ -178,7 +178,7 @@ func createDefaultMetrics() []*dto.LabelPair { func createMetrics() { meter = global.MeterProvider().Meter(project.Name) - MetricsMap = NewMessagePoolMetricsMap() + MetricsMap = NewQueueMetricsMap() // Housekeeper @@ -243,39 +243,39 @@ func createMetrics() { ) panicInstrumentationError(err) - // MessagePool + // Queue - MessagePoolTimeout, err = meter.Int64Counter( + QueueTimeout, err = meter.Int64Counter( "deckard_message_timeout", instrument.WithDescription("Number of message timeouts"), ) panicInstrumentationError(err) - MessagePoolAck, err = meter.Int64Counter( + QueueAck, err = meter.Int64Counter( "deckard_ack", instrument.WithDescription("Number of acks received"), ) panicInstrumentationError(err) - MessagePoolNack, err = meter.Int64Counter( + QueueNack, err = meter.Int64Counter( "deckard_nack", instrument.WithDescription("Number of nacks received"), ) panicInstrumentationError(err) - MessagePoolEmptyQueue, err = meter.Int64Counter( + QueueEmptyQueue, err = meter.Int64Counter( "deckard_messages_empty_queue", instrument.WithDescription("Number of times a pull is made against an empty queue."), ) panicInstrumentationError(err) - MessagePoolEmptyQueueStorage, err = meter.Int64Counter( + QueueEmptyQueueStorage, err = meter.Int64Counter( "deckard_messages_empty_queue_not_found_storage", instrument.WithDescription("Number of times a pull is made against an empty queue because the messages were not found in the storage."), ) panicInstrumentationError(err) - MessagePoolNotFoundInStorage, err = meter.Int64Counter( + QueueNotFoundInStorage, err = meter.Int64Counter( "deckard_messages_not_found_in_storage", instrument.WithDescription("Number of messages in cache but not found in the storage."), ) diff --git a/internal/metrics/metrics_map.go b/internal/metrics/metrics_map.go index 846106d..381bb62 100644 --- a/internal/metrics/metrics_map.go +++ b/internal/metrics/metrics_map.go @@ -1,23 +1,23 @@ package metrics // Type to hold map of oldest queue elements -type MessagePoolMetricsMap struct { +type QueueMetricsMap struct { OldestElement map[string]int64 TotalElements map[string]int64 } -func NewMessagePoolMetricsMap() *MessagePoolMetricsMap { - return &MessagePoolMetricsMap{ +func NewQueueMetricsMap() *QueueMetricsMap { + return &QueueMetricsMap{ OldestElement: make(map[string]int64, 0), TotalElements: make(map[string]int64, 0), } } -func (oldestMap *MessagePoolMetricsMap) UpdateOldestElementMap(data map[string]int64) { +func (oldestMap *QueueMetricsMap) UpdateOldestElementMap(data map[string]int64) { oldestMap.OldestElement = mergeData(oldestMap.OldestElement, data) } -func (oldestMap *MessagePoolMetricsMap) UpdateTotalElementsMap(data map[string]int64) { +func (oldestMap *QueueMetricsMap) UpdateTotalElementsMap(data map[string]int64) { oldestMap.TotalElements = mergeData(oldestMap.TotalElements, data) } diff --git a/internal/metrics/metrics_map_test.go b/internal/metrics/metrics_map_test.go index d0a7c54..233bcdf 100644 --- a/internal/metrics/metrics_map_test.go +++ b/internal/metrics/metrics_map_test.go @@ -7,7 +7,7 @@ import ( ) func TestChangeMapShouldChangeSuccessfully(t *testing.T) { - data := NewMessagePoolMetricsMap() + data := NewQueueMetricsMap() require.Empty(t, data.OldestElement) @@ -20,7 +20,7 @@ func TestChangeMapShouldChangeSuccessfully(t *testing.T) { } func TestChangeMapWithNilMapShouldEmptyMap(t *testing.T) { - data := NewMessagePoolMetricsMap() + data := NewQueueMetricsMap() data.OldestElement["a"] = int64(123) @@ -32,7 +32,7 @@ func TestChangeMapWithNilMapShouldEmptyMap(t *testing.T) { } func TestChangeMapWithMissingQueueShouldKeepElementAsZero(t *testing.T) { - data := NewMessagePoolMetricsMap() + data := NewQueueMetricsMap() data.OldestElement["a"] = int64(123) data.OldestElement["b"] = int64(432) diff --git a/internal/messagepool/cache/cache.go b/internal/queue/cache/cache.go similarity index 97% rename from internal/messagepool/cache/cache.go rename to internal/queue/cache/cache.go index 03bda7b..ebf7c5e 100644 --- a/internal/messagepool/cache/cache.go +++ b/internal/queue/cache/cache.go @@ -7,7 +7,7 @@ import ( "errors" "time" - "github.com/takenet/deckard/internal/messagepool/entities" + "github.com/takenet/deckard/internal/queue/entities" ) type Type string diff --git a/internal/messagepool/cache/cache_suite_test.go b/internal/queue/cache/cache_suite_test.go similarity index 99% rename from internal/messagepool/cache/cache_suite_test.go rename to internal/queue/cache/cache_suite_test.go index 928b5b0..fd0732f 100644 --- a/internal/messagepool/cache/cache_suite_test.go +++ b/internal/queue/cache/cache_suite_test.go @@ -8,8 +8,8 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/utils" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/utils" ) var ctx, cancel = context.WithCancel(context.Background()) diff --git a/internal/messagepool/cache/memory_cache.go b/internal/queue/cache/memory_cache.go similarity index 98% rename from internal/messagepool/cache/memory_cache.go rename to internal/queue/cache/memory_cache.go index 3546fbb..f5ae4fa 100644 --- a/internal/messagepool/cache/memory_cache.go +++ b/internal/queue/cache/memory_cache.go @@ -7,8 +7,8 @@ import ( "sync" "time" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/utils" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/utils" ) // MemoryStorage is an implementation of the Storage Interface using memory. diff --git a/internal/messagepool/cache/memory_cache_test.go b/internal/queue/cache/memory_cache_test.go similarity index 100% rename from internal/messagepool/cache/memory_cache_test.go rename to internal/queue/cache/memory_cache_test.go diff --git a/internal/messagepool/cache/redis_cache.go b/internal/queue/cache/redis_cache.go similarity index 99% rename from internal/messagepool/cache/redis_cache.go rename to internal/queue/cache/redis_cache.go index e361388..8cac2aa 100644 --- a/internal/messagepool/cache/redis_cache.go +++ b/internal/queue/cache/redis_cache.go @@ -13,9 +13,9 @@ import ( "github.com/meirf/gopart" "github.com/takenet/deckard/internal/config" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/utils" "github.com/takenet/deckard/internal/metrics" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/utils" "go.opentelemetry.io/otel/attribute" ) diff --git a/internal/messagepool/cache/redis_cache_scripts.go b/internal/queue/cache/redis_cache_scripts.go similarity index 100% rename from internal/messagepool/cache/redis_cache_scripts.go rename to internal/queue/cache/redis_cache_scripts.go diff --git a/internal/messagepool/cache/redis_cache_test.go b/internal/queue/cache/redis_cache_test.go similarity index 98% rename from internal/messagepool/cache/redis_cache_test.go rename to internal/queue/cache/redis_cache_test.go index 1a13c50..d08751e 100644 --- a/internal/messagepool/cache/redis_cache_test.go +++ b/internal/queue/cache/redis_cache_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/takenet/deckard/internal/config" - "github.com/takenet/deckard/internal/messagepool/entities" + "github.com/takenet/deckard/internal/queue/entities" ) func TestRedisCacheIntegration(t *testing.T) { diff --git a/internal/messagepool/entities/message.go b/internal/queue/entities/message.go similarity index 98% rename from internal/messagepool/entities/message.go rename to internal/queue/entities/message.go index 938366e..f1d3e29 100644 --- a/internal/messagepool/entities/message.go +++ b/internal/queue/entities/message.go @@ -4,7 +4,7 @@ import ( "strings" "time" - "github.com/takenet/deckard/internal/messagepool/utils" + "github.com/takenet/deckard/internal/queue/utils" "google.golang.org/protobuf/types/known/anypb" ) diff --git a/internal/messagepool/entities/message_test.go b/internal/queue/entities/message_test.go similarity index 95% rename from internal/messagepool/entities/message_test.go rename to internal/queue/entities/message_test.go index 2862082..8bef693 100644 --- a/internal/messagepool/entities/message_test.go +++ b/internal/queue/entities/message_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/takenet/deckard/internal/messagepool/utils" + "github.com/takenet/deckard/internal/queue/utils" ) func TestMaxScore(t *testing.T) { diff --git a/internal/messagepool/entities/queue_configuration.go b/internal/queue/entities/queue_configuration.go similarity index 100% rename from internal/messagepool/entities/queue_configuration.go rename to internal/queue/entities/queue_configuration.go diff --git a/internal/messagepool/message_pool.go b/internal/queue/queue.go similarity index 75% rename from internal/messagepool/message_pool.go rename to internal/queue/queue.go index 1657155..7a19b4a 100644 --- a/internal/messagepool/message_pool.go +++ b/internal/queue/queue.go @@ -1,6 +1,6 @@ -package messagepool +package queue -//go:generate mockgen -destination=../mocks/mock_message_pool.go -package=mocks -source=message_pool.go +//go:generate mockgen -destination=../mocks/mock_queue.go -package=mocks -source=queue.go import ( "context" @@ -12,15 +12,14 @@ import ( "github.com/elliotchance/orderedmap/v2" "github.com/takenet/deckard/internal/audit" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool/cache" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/queue" - "github.com/takenet/deckard/internal/messagepool/storage" "github.com/takenet/deckard/internal/metrics" + "github.com/takenet/deckard/internal/queue/cache" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" "go.opentelemetry.io/otel/attribute" ) -type DeckardMessagePool interface { +type DeckardQueue interface { AddMessagesToCache(ctx context.Context, messages ...*entities.Message) (int64, error) AddMessagesToStorage(ctx context.Context, messages ...*entities.Message) (inserted int64, updated int64, err error) Nack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error) @@ -36,27 +35,27 @@ type DeckardMessagePool interface { Flush(ctx context.Context) (bool, error) } -type MessagePool struct { +type Queue struct { storage storage.Storage cache cache.Cache auditor audit.Auditor - QueueConfigurationService queue.ConfigurationService + QueueConfigurationService QueueConfigurationService } -var _ DeckardMessagePool = &MessagePool{} +var _ DeckardQueue = &Queue{} -func NewMessagePool(auditor audit.Auditor, storageImpl storage.Storage, queueService queue.ConfigurationService, cache cache.Cache) *MessagePool { - messagePool := MessagePool{ +func NewQueue(auditor audit.Auditor, storageImpl storage.Storage, queueService QueueConfigurationService, cache cache.Cache) *Queue { + queue := Queue{ cache: cache, storage: storageImpl, auditor: auditor, QueueConfigurationService: queueService, } - return &messagePool + return &queue } -func (pool *MessagePool) Count(ctx context.Context, opts *storage.FindOptions) (int64, error) { +func (pool *Queue) Count(ctx context.Context, opts *storage.FindOptions) (int64, error) { if opts == nil { opts = &storage.FindOptions{} } @@ -74,7 +73,7 @@ func (pool *MessagePool) Count(ctx context.Context, opts *storage.FindOptions) ( return result, nil } -func (pool *MessagePool) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]entities.Message, error) { +func (pool *Queue) GetStorageMessages(ctx context.Context, opt *storage.FindOptions) ([]entities.Message, error) { result, err := pool.storage.Find(ctx, opt) if err != nil { @@ -86,7 +85,7 @@ func (pool *MessagePool) GetStorageMessages(ctx context.Context, opt *storage.Fi return result, nil } -func (pool *MessagePool) AddMessagesToStorage(ctx context.Context, messages ...*entities.Message) (inserted int64, updated int64, err error) { +func (pool *Queue) AddMessagesToStorage(ctx context.Context, messages ...*entities.Message) (inserted int64, updated int64, err error) { queues := make(map[string]bool) for i := range messages { @@ -110,11 +109,11 @@ func (pool *MessagePool) AddMessagesToStorage(ctx context.Context, messages ...* return insertions, updates, err } -func (pool *MessagePool) AddMessagesToCache(ctx context.Context, messages ...*entities.Message) (int64, error) { +func (pool *Queue) AddMessagesToCache(ctx context.Context, messages ...*entities.Message) (int64, error) { return pool.AddMessagesToCacheWithAuditReason(ctx, "", messages...) } -func (pool *MessagePool) AddMessagesToCacheWithAuditReason(ctx context.Context, reason string, messages ...*entities.Message) (int64, error) { +func (pool *Queue) AddMessagesToCacheWithAuditReason(ctx context.Context, reason string, messages ...*entities.Message) (int64, error) { membersByQueue := make(map[string][]*entities.Message) for i := range messages { queueMessages, ok := membersByQueue[messages[i].Queue] @@ -153,7 +152,7 @@ func (pool *MessagePool) AddMessagesToCacheWithAuditReason(ctx context.Context, return count, nil } -func (pool *MessagePool) Nack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error) { +func (pool *Queue) Nack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error) { if message == nil { return false, nil } @@ -167,7 +166,7 @@ func (pool *MessagePool) Nack(ctx context.Context, message *entities.Message, ti } defer func() { - metrics.MessagePoolNack.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(message.Queue)), attribute.String("reason", reason)) + metrics.QueueNack.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(message.Queue)), attribute.String("reason", reason)) }() if message.LockMs > 0 { @@ -214,7 +213,7 @@ func (pool *MessagePool) Nack(ctx context.Context, message *entities.Message, ti return result, nil } -func (pool *MessagePool) Ack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error) { +func (pool *Queue) Ack(ctx context.Context, message *entities.Message, timestamp time.Time, reason string) (bool, error) { if message == nil { return false, nil } @@ -238,7 +237,7 @@ func (pool *MessagePool) Ack(ctx context.Context, message *entities.Message, tim return false, err } - metrics.MessagePoolAck.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(message.Queue)), attribute.String("reason", reason)) + metrics.QueueAck.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(message.Queue)), attribute.String("reason", reason)) if message.LockMs > 0 { result, err := pool.cache.LockMessage(ctx, message, cache.LOCK_ACK) @@ -281,7 +280,7 @@ func (pool *MessagePool) Ack(ctx context.Context, message *entities.Message, tim return result, nil } -func (pool *MessagePool) TimeoutMessages(ctx context.Context, queue string) ([]string, error) { +func (pool *Queue) TimeoutMessages(ctx context.Context, queue string) ([]string, error) { ids, err := pool.cache.TimeoutMessages(context.Background(), queue, cache.DefaultCacheTimeout) if err != nil { @@ -291,7 +290,7 @@ func (pool *MessagePool) TimeoutMessages(ctx context.Context, queue string) ([]s } if len(ids) > 0 { - metrics.MessagePoolTimeout.Add(ctx, int64(len(ids)), attribute.String("queue", entities.GetQueuePrefix(queue))) + metrics.QueueTimeout.Add(ctx, int64(len(ids)), attribute.String("queue", entities.GetQueuePrefix(queue))) for _, id := range ids { pool.auditor.Store(ctx, audit.Entry{ @@ -305,7 +304,7 @@ func (pool *MessagePool) TimeoutMessages(ctx context.Context, queue string) ([]s return ids, nil } -func (pool *MessagePool) Pull(ctx context.Context, queue string, n int64, scoreFilter int64) (*[]entities.Message, error) { +func (pool *Queue) Pull(ctx context.Context, queue string, n int64, scoreFilter int64) (*[]entities.Message, error) { ids, err := pool.cache.PullMessages(ctx, queue, n, scoreFilter) if err != nil { logger.S(ctx).Error("Error pulling cache elements: ", err) @@ -314,7 +313,7 @@ func (pool *MessagePool) Pull(ctx context.Context, queue string, n int64, scoreF } if len(ids) == 0 { - metrics.MessagePoolEmptyQueue.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(queue))) + metrics.QueueEmptyQueue.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(queue))) return nil, nil } @@ -338,7 +337,7 @@ func (pool *MessagePool) Pull(ctx context.Context, queue string, n int64, scoreF } if len(retryNotFound) > 0 { - metrics.MessagePoolNotFoundInStorage.Add(ctx, int64(len(notFound)), attribute.String("queue", entities.GetQueuePrefix(queue))) + metrics.QueueNotFoundInStorage.Add(ctx, int64(len(notFound)), attribute.String("queue", entities.GetQueuePrefix(queue))) for _, id := range retryNotFound { pool.auditor.Store(ctx, audit.Entry{ @@ -363,7 +362,7 @@ func (pool *MessagePool) Pull(ctx context.Context, queue string, n int64, scoreF } if len(messages) == 0 { - metrics.MessagePoolEmptyQueueStorage.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(queue))) + metrics.QueueEmptyQueueStorage.Add(ctx, 1, attribute.String("queue", entities.GetQueuePrefix(queue))) return nil, nil } @@ -371,7 +370,7 @@ func (pool *MessagePool) Pull(ctx context.Context, queue string, n int64, scoreF return &messages, nil } -func (pool *MessagePool) getFromStorage(ctx context.Context, ids []string, queue string, sort *orderedmap.OrderedMap[string, int], retry bool) ([]entities.Message, []string, error) { +func (pool *Queue) getFromStorage(ctx context.Context, ids []string, queue string, sort *orderedmap.OrderedMap[string, int], retry bool) ([]entities.Message, []string, error) { messages, err := pool.storage.Find(ctx, &storage.FindOptions{ Sort: sort, InternalFilter: &storage.InternalFilter{ @@ -393,7 +392,7 @@ func (pool *MessagePool) getFromStorage(ctx context.Context, ids []string, queue return messages, notFound, nil } -func (pool *MessagePool) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error) { +func (pool *Queue) Remove(ctx context.Context, queue string, reason string, ids ...string) (cacheRemoved int64, storageRemoved int64, err error) { cacheCount, err := pool.cache.Remove(ctx, queue, ids...) if err != nil { @@ -422,7 +421,7 @@ func (pool *MessagePool) Remove(ctx context.Context, queue string, reason string return cacheCount, storageCount, nil } -func (pool *MessagePool) Flush(ctx context.Context) (bool, error) { +func (pool *Queue) Flush(ctx context.Context) (bool, error) { pool.cache.Flush(ctx) _, err := pool.storage.Flush(ctx) diff --git a/internal/messagepool/queue/configuration_service.go b/internal/queue/queue_configuration_service.go similarity index 67% rename from internal/messagepool/queue/configuration_service.go rename to internal/queue/queue_configuration_service.go index 335359e..ff8bf86 100644 --- a/internal/messagepool/queue/configuration_service.go +++ b/internal/queue/queue_configuration_service.go @@ -5,22 +5,22 @@ import ( "time" "github.com/patrickmn/go-cache" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/storage" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" ) -type ConfigurationService interface { +type QueueConfigurationService interface { EditQueueConfiguration(ctx context.Context, configuration *entities.QueueConfiguration) error GetQueueConfiguration(ctx context.Context, queue string) (*entities.QueueConfiguration, error) } -type DefaultConfigurationService struct { +type DefaultQueueConfigurationService struct { storage storage.Storage localCache *cache.Cache } -func NewConfigurationService(_ context.Context, storage storage.Storage) *DefaultConfigurationService { - service := &DefaultConfigurationService{} +func NewQueueConfigurationService(_ context.Context, storage storage.Storage) *DefaultQueueConfigurationService { + service := &DefaultQueueConfigurationService{} service.localCache = cache.New(9*time.Minute, 1*time.Minute) service.storage = storage @@ -28,9 +28,9 @@ func NewConfigurationService(_ context.Context, storage storage.Storage) *Defaul return service } -var _ ConfigurationService = &DefaultConfigurationService{} +var _ QueueConfigurationService = &DefaultQueueConfigurationService{} -func (queueService *DefaultConfigurationService) EditQueueConfiguration(ctx context.Context, cfg *entities.QueueConfiguration) error { +func (queueService *DefaultQueueConfigurationService) EditQueueConfiguration(ctx context.Context, cfg *entities.QueueConfiguration) error { if cfg == nil { return nil } @@ -57,7 +57,7 @@ func (queueService *DefaultConfigurationService) EditQueueConfiguration(ctx cont return nil } -func (queueService *DefaultConfigurationService) GetQueueConfiguration(ctx context.Context, queue string) (*entities.QueueConfiguration, error) { +func (queueService *DefaultQueueConfigurationService) GetQueueConfiguration(ctx context.Context, queue string) (*entities.QueueConfiguration, error) { cacheConfig, found := queueService.localCache.Get(queue) if found { diff --git a/internal/messagepool/queue/configuration_service_test.go b/internal/queue/queue_configuration_service_test.go similarity index 62% rename from internal/messagepool/queue/configuration_service_test.go rename to internal/queue/queue_configuration_service_test.go index 68239b6..421c8d3 100644 --- a/internal/messagepool/queue/configuration_service_test.go +++ b/internal/queue/queue_configuration_service_test.go @@ -8,11 +8,11 @@ import ( "github.com/golang/mock/gomock" "github.com/patrickmn/go-cache" "github.com/stretchr/testify/require" - "github.com/takenet/deckard/internal/messagepool/entities" "github.com/takenet/deckard/internal/mocks" + "github.com/takenet/deckard/internal/queue/entities" ) -var ctx = context.Background() +var configurationCtx = context.Background() func TestCreateQueueConfigurationShouldCreateCache(t *testing.T) { t.Parallel() @@ -22,7 +22,7 @@ func TestCreateQueueConfigurationShouldCreateCache(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) - configuration := NewConfigurationService(ctx, mockStorage) + configuration := NewQueueConfigurationService(configurationCtx, mockStorage) require.NotNil(t, configuration.localCache) require.NotNil(t, configuration.storage) @@ -31,17 +31,17 @@ func TestCreateQueueConfigurationShouldCreateCache(t *testing.T) { func TestEditConfigurationNilConfigurationShouldDoNothing(t *testing.T) { t.Parallel() - configuration := NewConfigurationService(ctx, nil) + configuration := NewQueueConfigurationService(configurationCtx, nil) - require.NoError(t, configuration.EditQueueConfiguration(ctx, nil)) + require.NoError(t, configuration.EditQueueConfiguration(configurationCtx, nil)) } func TestEditConfigurationMaxElementsZeroShouldDoNothing(t *testing.T) { t.Parallel() - configuration := NewConfigurationService(ctx, nil) + configuration := NewQueueConfigurationService(configurationCtx, nil) - require.NoError(t, configuration.EditQueueConfiguration(ctx, &entities.QueueConfiguration{MaxElements: 0})) + require.NoError(t, configuration.EditQueueConfiguration(configurationCtx, &entities.QueueConfiguration{MaxElements: 0})) } func TestEditConfigurationCacheNotFoundShouldCallStorageEdit(t *testing.T) { @@ -53,22 +53,22 @@ func TestEditConfigurationCacheNotFoundShouldCallStorageEdit(t *testing.T) { config := &entities.QueueConfiguration{MaxElements: 321, Queue: "q1"} mockStorage := mocks.NewMockStorage(mockCtrl) - mockStorage.EXPECT().EditQueueConfiguration(ctx, config).Return(nil) - configuration := NewConfigurationService(ctx, mockStorage) + mockStorage.EXPECT().EditQueueConfiguration(configurationCtx, config).Return(nil) + configuration := NewQueueConfigurationService(configurationCtx, mockStorage) - require.NoError(t, configuration.EditQueueConfiguration(ctx, config)) + require.NoError(t, configuration.EditQueueConfiguration(configurationCtx, config)) } func TestEditConfigurationCacheFoundWithSameConfigShouldDoNothing(t *testing.T) { t.Parallel() - configuration := NewConfigurationService(ctx, nil) + configuration := NewQueueConfigurationService(configurationCtx, nil) config := &entities.QueueConfiguration{MaxElements: 321, Queue: "q1"} configuration.localCache.Set("q1", config, cache.DefaultExpiration) - require.NoError(t, configuration.EditQueueConfiguration(ctx, config)) + require.NoError(t, configuration.EditQueueConfiguration(configurationCtx, config)) } func TestEditConfigurationCacheFoundWithDifferentConfigShouldCallStorageAndDeleteCache(t *testing.T) { @@ -80,13 +80,13 @@ func TestEditConfigurationCacheFoundWithDifferentConfigShouldCallStorageAndDelet config := &entities.QueueConfiguration{MaxElements: 321, Queue: "q1"} mockStorage := mocks.NewMockStorage(mockCtrl) - mockStorage.EXPECT().EditQueueConfiguration(ctx, config).Return(nil) + mockStorage.EXPECT().EditQueueConfiguration(configurationCtx, config).Return(nil) - configuration := NewConfigurationService(ctx, mockStorage) + configuration := NewQueueConfigurationService(configurationCtx, mockStorage) configuration.localCache.Set("q1", &entities.QueueConfiguration{MaxElements: 123, Queue: "q1"}, cache.DefaultExpiration) - require.NoError(t, configuration.EditQueueConfiguration(ctx, config)) + require.NoError(t, configuration.EditQueueConfiguration(configurationCtx, config)) result, found := configuration.localCache.Get("q1") require.False(t, found) @@ -98,11 +98,11 @@ func TestGetConfigurationFromCacheShouldResultFromCache(t *testing.T) { config := &entities.QueueConfiguration{MaxElements: 321, Queue: "q1"} - configuration := NewConfigurationService(ctx, nil) + configuration := NewQueueConfigurationService(configurationCtx, nil) configuration.localCache.Set("q1", config, cache.DefaultExpiration) - result, err := configuration.GetQueueConfiguration(ctx, "q1") + result, err := configuration.GetQueueConfiguration(configurationCtx, "q1") require.NoError(t, err) require.Same(t, config, result) } @@ -114,11 +114,11 @@ func TestGetConfigurationCacheMissStorageErrorShouldResultError(t *testing.T) { defer mockCtrl.Finish() mockStorage := mocks.NewMockStorage(mockCtrl) - mockStorage.EXPECT().GetQueueConfiguration(ctx, "q1").Return(nil, fmt.Errorf("anyerr")) + mockStorage.EXPECT().GetQueueConfiguration(configurationCtx, "q1").Return(nil, fmt.Errorf("anyerr")) - configuration := NewConfigurationService(ctx, mockStorage) + configuration := NewQueueConfigurationService(configurationCtx, mockStorage) - result, err := configuration.GetQueueConfiguration(ctx, "q1") + result, err := configuration.GetQueueConfiguration(configurationCtx, "q1") require.Error(t, err) require.Nil(t, result) } @@ -130,14 +130,14 @@ func TestGetConfigurationCacheMissStorageNotFoundShouldResultDefaultConfiguratio defer mockCtrl.Finish() mockStorage := mocks.NewMockStorage(mockCtrl) - mockStorage.EXPECT().GetQueueConfiguration(ctx, "q1").Return(nil, nil) + mockStorage.EXPECT().GetQueueConfiguration(configurationCtx, "q1").Return(nil, nil) - configuration := NewConfigurationService(ctx, mockStorage) + configuration := NewQueueConfigurationService(configurationCtx, mockStorage) _, found := configuration.localCache.Get("q1") require.False(t, found) - result, err := configuration.GetQueueConfiguration(ctx, "q1") + result, err := configuration.GetQueueConfiguration(configurationCtx, "q1") require.NoError(t, err) require.Equal(t, &entities.QueueConfiguration{ Queue: "q1", @@ -161,14 +161,14 @@ func TestGetConfigurationCacheMissStorageFoundShouldResultStorageConfigurationAn Queue: "q1", MaxElements: 534, } - mockStorage.EXPECT().GetQueueConfiguration(ctx, "q1").Return(storageConfig, nil) + mockStorage.EXPECT().GetQueueConfiguration(configurationCtx, "q1").Return(storageConfig, nil) - configuration := NewConfigurationService(ctx, mockStorage) + configuration := NewQueueConfigurationService(configurationCtx, mockStorage) _, found := configuration.localCache.Get("q1") require.False(t, found) - result, err := configuration.GetQueueConfiguration(ctx, "q1") + result, err := configuration.GetQueueConfiguration(configurationCtx, "q1") require.NoError(t, err) require.Same(t, storageConfig, result) diff --git a/internal/messagepool/message_pool_housekeeper.go b/internal/queue/queue_housekeeper.go similarity index 91% rename from internal/messagepool/message_pool_housekeeper.go rename to internal/queue/queue_housekeeper.go index cfbc0dd..3d685af 100644 --- a/internal/messagepool/message_pool_housekeeper.go +++ b/internal/queue/queue_housekeeper.go @@ -1,4 +1,4 @@ -package messagepool +package queue import ( "context" @@ -7,11 +7,11 @@ import ( "github.com/elliotchance/orderedmap/v2" "github.com/takenet/deckard/internal/audit" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool/cache" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/storage" - "github.com/takenet/deckard/internal/messagepool/utils" "github.com/takenet/deckard/internal/metrics" + "github.com/takenet/deckard/internal/queue/cache" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" + "github.com/takenet/deckard/internal/queue/utils" "github.com/takenet/deckard/internal/shutdown" "go.opentelemetry.io/otel/attribute" ) @@ -21,7 +21,7 @@ import ( // TODO: allow each queue to have its own deadline for timeout. // TODO: change the behavior of this so it doesn't need to load all queue names in memory, we could use the storage to list queues with a cursor // TODO: we could even change the timeout mechanism to be not based on the queue name -func ProcessTimeoutMessages(ctx context.Context, pool *MessagePool) error { +func ProcessTimeoutMessages(ctx context.Context, pool *Queue) error { t := time.Now() queues, err := pool.cache.ListQueues(ctx, "*", entities.PROCESSING_POOL) @@ -70,7 +70,7 @@ func ProcessTimeoutMessages(ctx context.Context, pool *MessagePool) error { } // processLockPool moves messages from the lock message pool to the message pool. -func ProcessLockPool(ctx context.Context, pool *MessagePool) { +func ProcessLockPool(ctx context.Context, pool *Queue) { lockAckQueues, err := pool.cache.ListQueues(ctx, "*", entities.LOCK_ACK_POOL) if err != nil { @@ -96,7 +96,7 @@ func ProcessLockPool(ctx context.Context, pool *MessagePool) { unlockMessages(ctx, pool, lockNackQueues, cache.LOCK_NACK) } -func unlockMessages(ctx context.Context, pool *MessagePool, queues []string, lockType cache.LockType) { +func unlockMessages(ctx context.Context, pool *Queue, queues []string, lockType cache.LockType) { for i := range queues { if shutdown.Ongoing() { logger.S(ctx).Info("Shutdown started. Stopping unlock process.") @@ -125,7 +125,7 @@ func unlockMessages(ctx context.Context, pool *MessagePool, queues []string, loc } } -func isRecovering(ctx context.Context, pool *MessagePool) (bool, error) { +func isRecovering(ctx context.Context, pool *Queue) (bool, error) { recovery, err := pool.cache.Get(ctx, cache.RECOVERY_RUNNING) if err != nil { logger.S(ctx).Error("Error to get full recovery status: ", err) @@ -137,7 +137,7 @@ func isRecovering(ctx context.Context, pool *MessagePool) (bool, error) { } // RecoveryMessagesPool recover messages pool sending all storage data to cache -func RecoveryMessagesPool(ctx context.Context, pool *MessagePool) (metrify bool) { +func RecoveryMessagesPool(ctx context.Context, pool *Queue) (metrify bool) { t := time.Now() breakpoint, err := pool.cache.Get(ctx, cache.RECOVERY_STORAGE_BREAKPOINT_KEY) @@ -223,7 +223,7 @@ func RecoveryMessagesPool(ctx context.Context, pool *MessagePool) (metrify bool) return true } -func tryToStartRecovery(ctx context.Context, pool *MessagePool) bool { +func tryToStartRecovery(ctx context.Context, pool *Queue) bool { logger.S(ctx).Info("Starting full cache recovery.") err := pool.cache.Set(ctx, cache.RECOVERY_RUNNING, "true") @@ -269,7 +269,7 @@ func tryToStartRecovery(ctx context.Context, pool *MessagePool) bool { } // Remove 10000 expired elements ordered by expiration date asc for each queue. -func RemoveTTLMessages(ctx context.Context, pool *MessagePool, filterDate *time.Time) (bool, error) { +func RemoveTTLMessages(ctx context.Context, pool *Queue, filterDate *time.Time) (bool, error) { if isRecovering, err := isRecovering(ctx, pool); isRecovering || err != nil { return false, err } @@ -332,7 +332,7 @@ func RemoveTTLMessages(ctx context.Context, pool *MessagePool, filterDate *time. // Checks if there is any queue with max_elements configuration and // then remove every exceeding messages using expiry_date to sort which elements will be removed // TODO manage message pool update individually by queue to avoid future bottlenecks -func RemoveExceedingMessages(ctx context.Context, pool *MessagePool) (bool, error) { +func RemoveExceedingMessages(ctx context.Context, pool *Queue) (bool, error) { if isRecovering, err := isRecovering(ctx, pool); isRecovering || err != nil { return false, err } @@ -358,7 +358,7 @@ func RemoveExceedingMessages(ctx context.Context, pool *MessagePool) (bool, erro return true, nil } -func (pool *MessagePool) removeExceedingMessagesFromQueue(ctx context.Context, queueConfiguration *entities.QueueConfiguration) error { +func (pool *Queue) removeExceedingMessagesFromQueue(ctx context.Context, queueConfiguration *entities.QueueConfiguration) error { if queueConfiguration == nil || queueConfiguration.MaxElements <= 0 { return nil } @@ -420,7 +420,7 @@ func (pool *MessagePool) removeExceedingMessagesFromQueue(ctx context.Context, q return nil } -func ComputeMetrics(ctx context.Context, pool *MessagePool) { +func ComputeMetrics(ctx context.Context, pool *Queue) { queues, err := pool.storage.ListQueuePrefixes(ctx) if err != nil { diff --git a/internal/messagepool/message_pool_housekeeper_test.go b/internal/queue/queue_housekeeper_test.go similarity index 91% rename from internal/messagepool/message_pool_housekeeper_test.go rename to internal/queue/queue_housekeeper_test.go index 0a6c81d..219f797 100644 --- a/internal/messagepool/message_pool_housekeeper_test.go +++ b/internal/queue/queue_housekeeper_test.go @@ -1,4 +1,4 @@ -package messagepool +package queue import ( "context" @@ -12,16 +12,15 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/takenet/deckard/internal/audit" - "github.com/takenet/deckard/internal/messagepool/cache" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/queue" - "github.com/takenet/deckard/internal/messagepool/storage" "github.com/takenet/deckard/internal/metrics" "github.com/takenet/deckard/internal/mocks" + "github.com/takenet/deckard/internal/queue/cache" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" "go.mongodb.org/mongo-driver/bson/primitive" ) -func TestUpdateOldestMessagePoolMap(t *testing.T) { +func TestUpdateOldestQueueMap(t *testing.T) { t.Parallel() mockCtrl := gomock.NewController(t) @@ -70,7 +69,7 @@ func TestUpdateOldestMessagePoolMap(t *testing.T) { }).Return(int64(11), nil) mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) ComputeMetrics(ctx, q) @@ -103,7 +102,7 @@ func TestProcessLockPool(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) ProcessLockPool(ctx, q) } @@ -122,7 +121,7 @@ func TestProcessLockPoolAckListErrorShouldDoNothing(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) ProcessLockPool(ctx, q) } @@ -142,7 +141,7 @@ func TestProcessLockPoolNackAckListErrorShouldDoNothing(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) ProcessLockPool(ctx, q) } @@ -167,7 +166,7 @@ func TestProcessUnlockErrorShouldUnlockOthers(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) ProcessLockPool(ctx, q) } @@ -243,7 +242,7 @@ func TestRecoveryMessagesCacheError(t *testing.T) { Limit: int64(4000), }).Return(storageMessages, nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -259,7 +258,7 @@ func TestCheckTimeoutMessagesListQueueErrorShouldDoNothing(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().ListQueues(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("error list queues")) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) _ = ProcessTimeoutMessages(ctx, q) } @@ -277,7 +276,7 @@ func TestCheckTimeoutMessagesErrorForQueueShouldContinueOtherQueues(t *testing.T mockCache.EXPECT().TimeoutMessages(gomock.Any(), "q1", cache.DefaultCacheTimeout).Return(nil, errors.New("error timeout messages")) mockCache.EXPECT().TimeoutMessages(gomock.Any(), "q2", cache.DefaultCacheTimeout).Return([]string{"id2"}, nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) _ = ProcessTimeoutMessages(ctx, q) } @@ -365,7 +364,7 @@ func TestRecoveryMessagesPoolShouldAddMessagesAfterBreakpoint(t *testing.T) { LastScoreSubtract: 23457, }).Return("65456") - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -395,7 +394,7 @@ func TestRecoveryMessagesPoolInitWithEmptyStorageShouldNotStartRecovery(t *testi Limit: int64(1), }).Return([]entities.Message{}, nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -472,7 +471,7 @@ func TestRecoveryMessagesPoolInitNonEmptyStorageShouldStartRecovery(t *testing.T }).Return(storageMessages, nil) mockStorage.EXPECT().GetStringInternalId(ctx, &storageMessages[len(storageMessages)-1]).Return(storageNotLast.Hex()) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -562,7 +561,7 @@ func TestRecoveryMessagesPoolAlreadyRunning(t *testing.T) { LastScoreSubtract: 23457, }).Return("65456") - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -577,7 +576,7 @@ func TestRecoveryMessagesPoolStorageBreakpointError(t *testing.T) { mockCache.EXPECT().Get(ctx, cache.RECOVERY_STORAGE_BREAKPOINT_KEY).Return("", errors.New("storage breakpoint error")) mockStorage := mocks.NewMockStorage(mockCtrl) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -593,7 +592,7 @@ func TestRecoveryMessagesPoolRecoveryRunningError(t *testing.T) { mockCache.EXPECT().Get(ctx, cache.RECOVERY_RUNNING).Return("", errors.New("recovery running error")) mockStorage := mocks.NewMockStorage(mockCtrl) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -610,7 +609,7 @@ func TestRecoveryMessagesPoolRecoveryBreakpointRunningError(t *testing.T) { mockCache.EXPECT().Get(ctx, cache.RECOVERY_BREAKPOINT_KEY).Return("", errors.New("recovery running error")) mockStorage := mocks.NewMockStorage(mockCtrl) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -647,7 +646,7 @@ func TestRecoveryMessagesPoolStorageError(t *testing.T) { Limit: int64(4000), }).Return(nil, errors.New("storage errors")) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) RecoveryMessagesPool(ctx, q) } @@ -693,7 +692,7 @@ func TestRemoveTTLMessagesShouldRemoveExpiredElements(t *testing.T) { mockCache.EXPECT().Remove(gomock.Any(), "q2", []string{"2"}).Return(int64(1), nil) mockCache.EXPECT().Get(gomock.Any(), cache.RECOVERY_RUNNING).Return("", nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) result, err := RemoveTTLMessages(ctx, q, &now) require.NoError(t, err) @@ -716,7 +715,7 @@ func TestCheckTimeoutMessagesShouldExecuteTimeoutToAllQueues(t *testing.T) { mockCache.EXPECT().TimeoutMessages(gomock.Any(), "q1", cache.DefaultCacheTimeout).Return([]string{"id1"}, nil) mockCache.EXPECT().TimeoutMessages(gomock.Any(), "q2", cache.DefaultCacheTimeout).Return([]string{"id2"}, nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) _ = ProcessTimeoutMessages(ctx, q) } @@ -733,7 +732,7 @@ func TestRemoveExceedingMessagesNoQueuesShouldDoNothing(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Get(gomock.Any(), cache.RECOVERY_RUNNING).Return("", nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) _, _ = RemoveExceedingMessages(ctx, q) } @@ -750,7 +749,7 @@ func TestRemoveExceedingMessagesListErrorShouldDoNothing(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Get(gomock.Any(), cache.RECOVERY_RUNNING).Return("", nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, nil, mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, nil, mockCache) _, _ = RemoveExceedingMessages(ctx, q) } @@ -767,7 +766,7 @@ func TestRemoveExceedingMessagesNoQueuesShouldCallRemoveMethodToEachQueue(t *tes mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Get(gomock.Any(), cache.RECOVERY_RUNNING).Return("", nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) _, _ = RemoveExceedingMessages(ctx, q) } @@ -782,7 +781,7 @@ func TestRemoveExceedingMessagesRecoveryRunning(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Get(gomock.Any(), cache.RECOVERY_RUNNING).Return("true", nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) _, _ = RemoveExceedingMessages(ctx, q) } @@ -797,7 +796,7 @@ func TestRemoveTTLMessagesRecoveryRunning(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Get(gomock.Any(), cache.RECOVERY_RUNNING).Return("true", nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) _, _ = RemoveTTLMessages(ctx, q, &time.Time{}) } diff --git a/internal/messagepool/message_pool_test.go b/internal/queue/queue_test.go similarity index 87% rename from internal/messagepool/message_pool_test.go rename to internal/queue/queue_test.go index c6521d3..b811fb1 100644 --- a/internal/messagepool/message_pool_test.go +++ b/internal/queue/queue_test.go @@ -1,4 +1,4 @@ -package messagepool +package queue import ( "context" @@ -11,11 +11,10 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/takenet/deckard/internal/audit" - "github.com/takenet/deckard/internal/messagepool/cache" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/queue" - "github.com/takenet/deckard/internal/messagepool/storage" "github.com/takenet/deckard/internal/mocks" + "github.com/takenet/deckard/internal/queue/cache" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" ) var ctx = context.Background() @@ -47,7 +46,7 @@ func TestPull(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().PullMessages(gomock.Any(), "test", int64(1), int64(0)).Return([]string{"123"}, nil) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) messages, err := q.Pull(ctx, "test", 1, 0) @@ -73,7 +72,7 @@ func TestAckStorageErrorShouldResultError(t *testing.T) { }).Return(int64(0), errors.New("ack_error")) mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) result, err := q.Ack(ctx, &entities.Message{ ID: "id", @@ -103,7 +102,7 @@ func TestAckMakeAvailableErrorShouldResultError(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().MakeAvailable(gomock.Any(), message).Return(false, errors.New("make available error")) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) result, err := q.Ack(ctx, &entities.Message{ ID: "id", @@ -144,7 +143,7 @@ func TestAckSuccessfulShouldAudit(t *testing.T) { Reason: "reason", }) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) result, err := q.Ack(ctx, &entities.Message{ ID: "id", @@ -158,7 +157,7 @@ func TestAckSuccessfulShouldAudit(t *testing.T) { func TestAckNilMessage(t *testing.T) { t.Parallel() - q := NewMessagePool(nil, nil, nil, nil) + q := NewQueue(nil, nil, nil, nil) result, err := q.Ack(ctx, nil, time.Time{}, "") @@ -169,7 +168,7 @@ func TestAckNilMessage(t *testing.T) { func TestAckWithoutQueue(t *testing.T) { t.Parallel() - q := NewMessagePool(nil, nil, nil, nil) + q := NewQueue(nil, nil, nil, nil) result, err := q.Ack(ctx, &entities.Message{ID: "1"}, time.Time{}, "") @@ -180,7 +179,7 @@ func TestAckWithoutQueue(t *testing.T) { func TestAckWithoutId(t *testing.T) { t.Parallel() - q := NewMessagePool(nil, nil, nil, nil) + q := NewQueue(nil, nil, nil, nil) result, err := q.Ack(ctx, &entities.Message{Queue: "queue"}, time.Time{}, "") @@ -206,7 +205,7 @@ func TestNackMakeAvailableErrorShouldResultError(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().MakeAvailable(gomock.Any(), message).Return(false, errors.New("make available error")) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) result, err := q.Nack(ctx, &entities.Message{ ID: "id", @@ -245,7 +244,7 @@ func TestNackSuccessfulShouldAudit(t *testing.T) { Reason: "reason", }) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) result, err := q.Nack(ctx, &entities.Message{ ID: "id", @@ -259,7 +258,7 @@ func TestNackSuccessfulShouldAudit(t *testing.T) { func TestNackNilMessage(t *testing.T) { t.Parallel() - q := NewMessagePool(nil, nil, nil, nil) + q := NewQueue(nil, nil, nil, nil) result, err := q.Nack(ctx, nil, time.Now(), "") @@ -270,7 +269,7 @@ func TestNackNilMessage(t *testing.T) { func TestNackWithoutQueue(t *testing.T) { t.Parallel() - q := NewMessagePool(nil, nil, nil, nil) + q := NewQueue(nil, nil, nil, nil) result, err := q.Nack(ctx, &entities.Message{ID: "1"}, time.Now(), "") @@ -281,7 +280,7 @@ func TestNackWithoutQueue(t *testing.T) { func TestNackWithoutId(t *testing.T) { t.Parallel() - q := NewMessagePool(nil, nil, nil, nil) + q := NewQueue(nil, nil, nil, nil) result, err := q.Nack(ctx, &entities.Message{Queue: "queue"}, time.Now(), "") @@ -337,7 +336,7 @@ func TestPullShouldDeleteNotFoundInStorageAndReturnRemaining(t *testing.T) { Queue: "test", Signal: audit.MISSING_STORAGE, }) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) messages, err := q.Pull(ctx, "test", 3, 0) @@ -391,7 +390,7 @@ func TestPullElementsFromRetryShouldNotAuditMissingElements(t *testing.T) { mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) messages, err := q.Pull(ctx, "test", 3, 0) @@ -449,7 +448,7 @@ func TestPullElementsFromBothFirstTryAndRetryShouldMergeElementsAndKeepScoreOrde mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) messages, err := q.Pull(ctx, "test", 3, 0) @@ -510,7 +509,7 @@ func TestPullNothingFoundOnStorage(t *testing.T) { Queue: "test", Signal: audit.MISSING_STORAGE, }) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) messages, err := q.Pull(ctx, "test", 3, 0) @@ -527,7 +526,7 @@ func TestPullCacheError(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().PullMessages(gomock.Any(), "test", int64(1), int64(0)).Return(nil, errors.New("cache_error")) - q := NewMessagePool(nil, nil, nil, mockCache) + q := NewQueue(nil, nil, nil, mockCache) messages, err := q.Pull(ctx, "test", 1, 0) @@ -544,7 +543,7 @@ func TestPullCacheNoResults(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().PullMessages(gomock.Any(), "test", int64(1), int64(0)).Return(nil, nil) - q := NewMessagePool(nil, nil, nil, mockCache) + q := NewQueue(nil, nil, nil, mockCache) messages, err := q.Pull(ctx, "test", 1, 0) @@ -578,7 +577,7 @@ func (m isSameEntry) String() string { return fmt.Sprintf("%v", m.value) } -func TestMessagePoolTimeoutError(t *testing.T) { +func TestQueueTimeoutError(t *testing.T) { t.Parallel() mockCtrl := gomock.NewController(t) @@ -587,14 +586,14 @@ func TestMessagePoolTimeoutError(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().TimeoutMessages(gomock.Any(), "queue_test", cache.DefaultCacheTimeout).Return(nil, errors.New("test_error")) - q := NewMessagePool(nil, nil, nil, mockCache) + q := NewQueue(nil, nil, nil, mockCache) _, err := q.TimeoutMessages(ctx, "queue_test") require.Error(t, err) } -func TestMessagePoolRemoveShouldRemoveFromCacheAndStorage(t *testing.T) { +func TestQueueRemoveShouldRemoveFromCacheAndStorage(t *testing.T) { t.Parallel() mockCtrl := gomock.NewController(t) @@ -619,7 +618,7 @@ func TestMessagePoolRemoveShouldRemoveFromCacheAndStorage(t *testing.T) { Reason: "", }) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) cacheRemoved, storageRemoved, err := q.Remove(ctx, "queue_test", "", "1", "2") require.NoError(t, err) @@ -627,7 +626,7 @@ func TestMessagePoolRemoveShouldRemoveFromCacheAndStorage(t *testing.T) { require.Equal(t, int64(2), storageRemoved) } -func TestMessagePoolRemoveCacheError(t *testing.T) { +func TestQueueRemoveCacheError(t *testing.T) { t.Parallel() mockCtrl := gomock.NewController(t) @@ -636,7 +635,7 @@ func TestMessagePoolRemoveCacheError(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Remove(gomock.Any(), "queue_test", "1").Return(int64(0), errors.New("cache_error")) - q := NewMessagePool(nil, nil, nil, mockCache) + q := NewQueue(nil, nil, nil, mockCache) cacheRemoved, storageRemoved, err := q.Remove(ctx, "queue_test", "", "1") require.Error(t, err) @@ -644,7 +643,7 @@ func TestMessagePoolRemoveCacheError(t *testing.T) { require.Equal(t, int64(0), storageRemoved) } -func TestMessagePoolRemoveStorageError(t *testing.T) { +func TestQueueRemoveStorageError(t *testing.T) { t.Parallel() mockCtrl := gomock.NewController(t) @@ -656,7 +655,7 @@ func TestMessagePoolRemoveStorageError(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) mockStorage.EXPECT().Remove(gomock.Any(), "queue_test", "1").Return(int64(0), errors.New("storage_error")) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) cacheRemoved, storageRemoved, err := q.Remove(ctx, "queue_test", "", "1") require.Error(t, err) @@ -664,7 +663,7 @@ func TestMessagePoolRemoveStorageError(t *testing.T) { require.Equal(t, int64(0), storageRemoved) } -func TestMessagePoolTimeout(t *testing.T) { +func TestQueueTimeout(t *testing.T) { t.Parallel() mockCtrl := gomock.NewController(t) @@ -686,7 +685,7 @@ func TestMessagePoolTimeout(t *testing.T) { Queue: "queue_test", }}).Times(1) - q := NewMessagePool(mockAuditor, nil, nil, mockCache) + q := NewQueue(mockAuditor, nil, nil, mockCache) ids, err := q.TimeoutMessages(ctx, "queue_test") @@ -738,7 +737,7 @@ func TestAddMessagesToCacheSameIdInSameRequestShouldSetLastElementScore(t *testi Signal: audit.INSERT_CACHE, }).MinTimes(2) - q := NewMessagePool(mockAuditor, nil, nil, mockCache) + q := NewQueue(mockAuditor, nil, nil, mockCache) messages := []*entities.Message{{ ID: "id", @@ -790,7 +789,7 @@ func TestAddMessagesToStorageWithoutEditingQueueConfiguration(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) mockStorage.EXPECT().Insert(gomock.Any(), messages).Return(int64(2), int64(0), nil) - q := NewMessagePool(nil, mockStorage, nil, nil) + q := NewQueue(nil, mockStorage, nil, nil) inserted, updated, err := q.AddMessagesToStorage(ctx, messages...) @@ -817,7 +816,7 @@ func TestAddMessagesError(t *testing.T) { }, }).Return(nil, errors.New("insert error")) - q := NewMessagePool(nil, nil, nil, mockCache) + q := NewQueue(nil, nil, nil, mockCache) messages := []*entities.Message{{ ID: "id", @@ -839,7 +838,7 @@ func TestRemoveExceedingMessagesQueueZeroMaxElementsShouldDoNothing(t *testing.T mockStorage := mocks.NewMockStorage(mockCtrl) mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) require.NoError(t, q.removeExceedingMessagesFromQueue(ctx, &entities.QueueConfiguration{MaxElements: 0, Queue: "q1"})) } @@ -852,14 +851,14 @@ func TestRemoveExceedingMessagesEmptyQueueShouldDoNothing(t *testing.T) { mockStorage := mocks.NewMockStorage(mockCtrl) - configuration := &entities.QueueConfiguration{MaxElements: 2, Queue: "q1"} + queueConfiguration := &entities.QueueConfiguration{MaxElements: 2, Queue: "q1"} mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(int64(0), nil) mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) - require.NoError(t, q.removeExceedingMessagesFromQueue(ctx, configuration)) + require.NoError(t, q.removeExceedingMessagesFromQueue(ctx, queueConfiguration)) } func TestRemoveExceedingMessagesErrorCountingShouldReturnError(t *testing.T) { @@ -868,16 +867,16 @@ func TestRemoveExceedingMessagesErrorCountingShouldReturnError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - configuration := &entities.QueueConfiguration{MaxElements: 2, Queue: "q1"} + queueConfiguration := &entities.QueueConfiguration{MaxElements: 2, Queue: "q1"} mockStorage := mocks.NewMockStorage(mockCtrl) mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(int64(0), fmt.Errorf("anyerr")) mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) - require.Error(t, q.removeExceedingMessagesFromQueue(ctx, configuration)) + require.Error(t, q.removeExceedingMessagesFromQueue(ctx, queueConfiguration)) } func TestRemoveExceedingMessagesShouldRemoveExceedingElements(t *testing.T) { @@ -891,7 +890,7 @@ func TestRemoveExceedingMessagesShouldRemoveExceedingElements(t *testing.T) { maxElements := int64(2) count := int64(5) - configuration := &entities.QueueConfiguration{MaxElements: maxElements, Queue: "q1"} + queueConfiguration := &entities.QueueConfiguration{MaxElements: maxElements, Queue: "q1"} mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(count, nil) @@ -915,9 +914,9 @@ func TestRemoveExceedingMessagesShouldRemoveExceedingElements(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Remove(gomock.Any(), "q1", []string{"1", "2"}).Return(int64(2), nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) - require.NoError(t, q.removeExceedingMessagesFromQueue(ctx, configuration)) + require.NoError(t, q.removeExceedingMessagesFromQueue(ctx, queueConfiguration)) } func TestRemoveExceedingMessagesFindErrorShouldRemoveResultError(t *testing.T) { @@ -931,7 +930,7 @@ func TestRemoveExceedingMessagesFindErrorShouldRemoveResultError(t *testing.T) { maxElements := int64(2) count := int64(5) - configuration := &entities.QueueConfiguration{MaxElements: maxElements, Queue: "q1"} + queueConfiguration := &entities.QueueConfiguration{MaxElements: maxElements, Queue: "q1"} mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(count, nil) @@ -942,9 +941,9 @@ func TestRemoveExceedingMessagesFindErrorShouldRemoveResultError(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) - require.Error(t, q.removeExceedingMessagesFromQueue(ctx, configuration)) + require.Error(t, q.removeExceedingMessagesFromQueue(ctx, queueConfiguration)) } func TestRemoveExceedingMessagesRemoveErrorShouldResultError(t *testing.T) { @@ -958,7 +957,7 @@ func TestRemoveExceedingMessagesRemoveErrorShouldResultError(t *testing.T) { maxElements := int64(2) count := int64(5) - configuration := &entities.QueueConfiguration{MaxElements: maxElements, Queue: "q1"} + queueConfiguration := &entities.QueueConfiguration{MaxElements: maxElements, Queue: "q1"} mockStorage.EXPECT().Count(gomock.Any(), &storage.FindOptions{InternalFilter: &storage.InternalFilter{Queue: "q1"}}).Return(count, nil) @@ -971,9 +970,9 @@ func TestRemoveExceedingMessagesRemoveErrorShouldResultError(t *testing.T) { mockCache := mocks.NewMockCache(mockCtrl) mockCache.EXPECT().Remove(gomock.Any(), "q1", []string{"1", "2"}).Return(int64(2), nil) - q := NewMessagePool(&audit.AuditorImpl{}, mockStorage, queue.NewConfigurationService(ctx, mockStorage), mockCache) + q := NewQueue(&audit.AuditorImpl{}, mockStorage, NewQueueConfigurationService(ctx, mockStorage), mockCache) - require.Error(t, q.removeExceedingMessagesFromQueue(ctx, configuration)) + require.Error(t, q.removeExceedingMessagesFromQueue(ctx, queueConfiguration)) } func TestCountShouldCallStorage(t *testing.T) { @@ -988,7 +987,7 @@ func TestCountShouldCallStorage(t *testing.T) { mockStorage.EXPECT().Count(ctx, opts).Return(int64(12), nil) mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) result, err := q.Count(ctx, opts) @@ -1008,7 +1007,7 @@ func TestNilOptsShouldCreateEmptyOpts(t *testing.T) { mockStorage.EXPECT().Count(ctx, opts).Return(int64(12), nil) mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) result, err := q.Count(ctx, nil) @@ -1028,7 +1027,7 @@ func TestCountStorageErrorShouldResultError(t *testing.T) { mockStorage.EXPECT().Count(ctx, opts).Return(int64(0), fmt.Errorf("error")) mockCache := mocks.NewMockCache(mockCtrl) - q := NewMessagePool(nil, mockStorage, nil, mockCache) + q := NewQueue(nil, mockStorage, nil, mockCache) result, err := q.Count(ctx, nil) @@ -1068,7 +1067,7 @@ func TestAckWithLockShouldLock(t *testing.T) { LockMs: 10, }) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) result, err := q.Ack(ctx, &entities.Message{ ID: "id", @@ -1104,7 +1103,7 @@ func TestAckWithLockErrorShouldResultError(t *testing.T) { mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) result, err := q.Ack(ctx, &entities.Message{ ID: "id", @@ -1147,7 +1146,7 @@ func TestNackWithLockShouldLock(t *testing.T) { LockMs: 10, }) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) result, err := q.Nack(ctx, &entities.Message{ ID: "id", @@ -1180,7 +1179,7 @@ func TestNackWithLockErrorShouldResultError(t *testing.T) { mockAuditor := mocks.NewMockAuditor(mockCtrl) - q := NewMessagePool(mockAuditor, mockStorage, nil, mockCache) + q := NewQueue(mockAuditor, mockStorage, nil, mockCache) result, err := q.Nack(ctx, &entities.Message{ ID: "id", diff --git a/internal/messagepool/storage/memory_storage.go b/internal/queue/storage/memory_storage.go similarity index 98% rename from internal/messagepool/storage/memory_storage.go rename to internal/queue/storage/memory_storage.go index 26408e1..748819f 100644 --- a/internal/messagepool/storage/memory_storage.go +++ b/internal/queue/storage/memory_storage.go @@ -9,8 +9,8 @@ import ( "sync" "time" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/utils" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/utils" ) // MemoryStorage is an implementation of the Storage Interface using memory. diff --git a/internal/messagepool/storage/memory_storage_test.go b/internal/queue/storage/memory_storage_test.go similarity index 93% rename from internal/messagepool/storage/memory_storage_test.go rename to internal/queue/storage/memory_storage_test.go index a2c397d..586b765 100644 --- a/internal/messagepool/storage/memory_storage_test.go +++ b/internal/queue/storage/memory_storage_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/takenet/deckard/internal/config" - "github.com/takenet/deckard/internal/messagepool/entities" + "github.com/takenet/deckard/internal/queue/entities" ) func TestMemoryStorage(t *testing.T) { diff --git a/internal/messagepool/storage/mongo_storage.go b/internal/queue/storage/mongo_storage.go similarity index 99% rename from internal/messagepool/storage/mongo_storage.go rename to internal/queue/storage/mongo_storage.go index f0923c2..b74bfb2 100644 --- a/internal/messagepool/storage/mongo_storage.go +++ b/internal/queue/storage/mongo_storage.go @@ -12,10 +12,10 @@ import ( "github.com/elliotchance/orderedmap/v2" "github.com/takenet/deckard/internal/config" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/utils" "github.com/takenet/deckard/internal/metrics" "github.com/takenet/deckard/internal/project" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/utils" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" diff --git a/internal/messagepool/storage/mongo_storage_test.go b/internal/queue/storage/mongo_storage_test.go similarity index 98% rename from internal/messagepool/storage/mongo_storage_test.go rename to internal/queue/storage/mongo_storage_test.go index 32c61dc..2531beb 100644 --- a/internal/messagepool/storage/mongo_storage_test.go +++ b/internal/queue/storage/mongo_storage_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/takenet/deckard/internal/config" - "github.com/takenet/deckard/internal/messagepool/entities" + "github.com/takenet/deckard/internal/queue/entities" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) diff --git a/internal/messagepool/storage/storage.go b/internal/queue/storage/storage.go similarity index 97% rename from internal/messagepool/storage/storage.go rename to internal/queue/storage/storage.go index 221c37f..bd858d5 100644 --- a/internal/messagepool/storage/storage.go +++ b/internal/queue/storage/storage.go @@ -9,7 +9,7 @@ import ( "github.com/elliotchance/orderedmap/v2" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool/entities" + "github.com/takenet/deckard/internal/queue/entities" ) type Type string diff --git a/internal/messagepool/storage/storage_suite_test.go b/internal/queue/storage/storage_suite_test.go similarity index 99% rename from internal/messagepool/storage/storage_suite_test.go rename to internal/queue/storage/storage_suite_test.go index 960d4c1..aa900dc 100644 --- a/internal/messagepool/storage/storage_suite_test.go +++ b/internal/queue/storage/storage_suite_test.go @@ -10,8 +10,8 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/takenet/deckard" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/utils" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/utils" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/wrapperspb" ) diff --git a/internal/messagepool/utils/glob_pattern.go b/internal/queue/utils/glob_pattern.go similarity index 100% rename from internal/messagepool/utils/glob_pattern.go rename to internal/queue/utils/glob_pattern.go diff --git a/internal/messagepool/utils/glob_pattern_test.go b/internal/queue/utils/glob_pattern_test.go similarity index 100% rename from internal/messagepool/utils/glob_pattern_test.go rename to internal/queue/utils/glob_pattern_test.go diff --git a/internal/messagepool/utils/math_helper.go b/internal/queue/utils/math_helper.go similarity index 100% rename from internal/messagepool/utils/math_helper.go rename to internal/queue/utils/math_helper.go diff --git a/internal/messagepool/utils/math_helper_test.go b/internal/queue/utils/math_helper_test.go similarity index 100% rename from internal/messagepool/utils/math_helper_test.go rename to internal/queue/utils/math_helper_test.go diff --git a/internal/messagepool/utils/primitive_conversion.go b/internal/queue/utils/primitive_conversion.go similarity index 100% rename from internal/messagepool/utils/primitive_conversion.go rename to internal/queue/utils/primitive_conversion.go diff --git a/internal/messagepool/utils/primitive_conversion_test.go b/internal/queue/utils/primitive_conversion_test.go similarity index 100% rename from internal/messagepool/utils/primitive_conversion_test.go rename to internal/queue/utils/primitive_conversion_test.go diff --git a/internal/service/deckard_service.go b/internal/service/deckard_service.go index e92a9f9..0150540 100644 --- a/internal/service/deckard_service.go +++ b/internal/service/deckard_service.go @@ -13,10 +13,9 @@ import ( "github.com/takenet/deckard" "github.com/takenet/deckard/internal/config" "github.com/takenet/deckard/internal/logger" - "github.com/takenet/deckard/internal/messagepool" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/queue" - "github.com/takenet/deckard/internal/messagepool/storage" + "github.com/takenet/deckard/internal/queue" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -31,8 +30,8 @@ import ( ) type Deckard struct { - pool messagepool.DeckardMessagePool - queueConfigurationService queue.ConfigurationService + pool queue.DeckardQueue + queueConfigurationService queue.QueueConfigurationService memoryInstance bool healthServer *health.Server @@ -43,7 +42,7 @@ type Deckard struct { var _ deckard.DeckardServer = (*Deckard)(nil) -func NewDeckardInstance(qpool messagepool.DeckardMessagePool, queueConfigurationService queue.ConfigurationService, memoryInstance bool) *Deckard { +func NewDeckardInstance(qpool queue.DeckardQueue, queueConfigurationService queue.QueueConfigurationService, memoryInstance bool) *Deckard { return &Deckard{ pool: qpool, queueConfigurationService: queueConfigurationService, @@ -53,12 +52,12 @@ func NewDeckardInstance(qpool messagepool.DeckardMessagePool, queueConfiguration } // Creates a memory deckard service -func NewMemoryDeckardService(qpool messagepool.DeckardMessagePool, queueConfigurationService queue.ConfigurationService) *Deckard { +func NewMemoryDeckardService(qpool queue.DeckardQueue, queueConfigurationService queue.QueueConfigurationService) *Deckard { return NewDeckardInstance(qpool, queueConfigurationService, true) } // Creates a non-memory deckard service -func NewDeckardService(qpool messagepool.DeckardMessagePool, queueConfigurationService queue.ConfigurationService) *Deckard { +func NewDeckardService(qpool queue.DeckardQueue, queueConfigurationService queue.QueueConfigurationService) *Deckard { return NewDeckardInstance(qpool, queueConfigurationService, false) } diff --git a/internal/service/deckard_service_suite_test.go b/internal/service/deckard_service_suite_test.go index f7a3b36..0bb0f6a 100644 --- a/internal/service/deckard_service_suite_test.go +++ b/internal/service/deckard_service_suite_test.go @@ -7,18 +7,18 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/takenet/deckard" - "github.com/takenet/deckard/internal/messagepool" - "github.com/takenet/deckard/internal/messagepool/cache" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/storage" + "github.com/takenet/deckard/internal/queue" + "github.com/takenet/deckard/internal/queue/cache" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" ) type DeckardIntegrationTestSuite struct { suite.Suite - deckard deckard.DeckardServer - deckardMessagePool messagepool.DeckardMessagePool - deckardCache cache.Cache - deckardStorage storage.Storage + deckard deckard.DeckardServer + deckardQueue queue.DeckardQueue + deckardCache cache.Cache + deckardStorage storage.Storage } func (suite *DeckardIntegrationTestSuite) AfterTest(_, _ string) { @@ -47,7 +47,7 @@ func (suite *DeckardIntegrationTestSuite) TestAddMessageIntegration() { require.NoError(suite.T(), err) require.Equal(suite.T(), int64(1), response.CreatedCount) - messages, err := suite.deckardMessagePool.GetStorageMessages(ctx, &storage.FindOptions{ + messages, err := suite.deckardQueue.GetStorageMessages(ctx, &storage.FindOptions{ InternalFilter: &storage.InternalFilter{ Ids: &[]string{"123"}, }, @@ -97,13 +97,13 @@ func (suite *DeckardIntegrationTestSuite) TestGetMessageIntegration() { require.NoError(suite.T(), err) - count, err := suite.deckardMessagePool.Count(ctx, nil) + count, err := suite.deckardQueue.Count(ctx, nil) require.NoError(suite.T(), err) require.Equal(suite.T(), int64(1), count) require.Equal(suite.T(), int64(1), res.CreatedCount) - messages, err := suite.deckardMessagePool.GetStorageMessages(ctx, &storage.FindOptions{ + messages, err := suite.deckardQueue.GetStorageMessages(ctx, &storage.FindOptions{ InternalFilter: &storage.InternalFilter{ Ids: &[]string{"123"}, }, @@ -140,7 +140,7 @@ func (suite *DeckardIntegrationTestSuite) TestGetMessageShouldResultMostScoreFir require.Equal(suite.T(), int64(3), response.CreatedCount) require.Equal(suite.T(), int64(0), response.UpdatedCount) - count, err := suite.deckardMessagePool.Count(ctx, nil) + count, err := suite.deckardQueue.Count(ctx, nil) require.Equal(suite.T(), int64(3), count) require.NoError(suite.T(), err) @@ -223,7 +223,7 @@ func testMessageRemoval(suite *DeckardIntegrationTestSuite, ackOrNack AckNackAct require.NoError(suite.T(), err) require.Equal(suite.T(), int64(1), response.CreatedCount) - messages, err := suite.deckardMessagePool.GetStorageMessages(ctx, &storage.FindOptions{ + messages, err := suite.deckardQueue.GetStorageMessages(ctx, &storage.FindOptions{ InternalFilter: &storage.InternalFilter{ Ids: &[]string{"-1"}, }, diff --git a/internal/service/deckard_service_test.go b/internal/service/deckard_service_test.go index 2755b89..54f6af3 100644 --- a/internal/service/deckard_service_test.go +++ b/internal/service/deckard_service_test.go @@ -17,12 +17,11 @@ import ( "github.com/takenet/deckard" "github.com/takenet/deckard/internal/audit" "github.com/takenet/deckard/internal/config" - "github.com/takenet/deckard/internal/messagepool" - "github.com/takenet/deckard/internal/messagepool/cache" - "github.com/takenet/deckard/internal/messagepool/entities" - "github.com/takenet/deckard/internal/messagepool/queue" - "github.com/takenet/deckard/internal/messagepool/storage" "github.com/takenet/deckard/internal/mocks" + "github.com/takenet/deckard/internal/queue" + "github.com/takenet/deckard/internal/queue/cache" + "github.com/takenet/deckard/internal/queue/entities" + "github.com/takenet/deckard/internal/queue/storage" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -42,11 +41,11 @@ func TestMemoryDeckardGRPCServeIntegration(t *testing.T) { storage := storage.NewMemoryStorage(ctx) cache := cache.NewMemoryCache() - queueService := queue.NewConfigurationService(ctx, storage) + queueService := queue.NewQueueConfigurationService(ctx, storage) - messagePool := messagepool.NewMessagePool(&audit.AuditorImpl{}, storage, queueService, cache) + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) - srv := NewMemoryDeckardService(messagePool, queueService) + srv := NewMemoryDeckardService(queue, queueService) server, err := srv.ServeGRPCServer(ctx) require.NoError(t, err) @@ -97,11 +96,11 @@ func TestDeckardServerTLS(t *testing.T) { storage := storage.NewMemoryStorage(ctx) cache := cache.NewMemoryCache() - queueService := queue.NewConfigurationService(ctx, storage) + queueService := queue.NewQueueConfigurationService(ctx, storage) - messagePool := messagepool.NewMessagePool(&audit.AuditorImpl{}, storage, queueService, cache) + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) - srv := NewMemoryDeckardService(messagePool, queueService) + srv := NewMemoryDeckardService(queue, queueService) server, err := srv.ServeGRPCServer(ctx) require.NoError(t, err) @@ -152,11 +151,11 @@ func TestDeckardMutualTLS(t *testing.T) { storage := storage.NewMemoryStorage(ctx) cache := cache.NewMemoryCache() - queueService := queue.NewConfigurationService(ctx, storage) + queueService := queue.NewQueueConfigurationService(ctx, storage) - messagePool := messagepool.NewMessagePool(&audit.AuditorImpl{}, storage, queueService, cache) + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) - srv := NewMemoryDeckardService(messagePool, queueService) + srv := NewMemoryDeckardService(queue, queueService) server, err := srv.ServeGRPCServer(ctx) require.NoError(t, err) @@ -201,17 +200,17 @@ func TestMemoryDeckardIntegration(t *testing.T) { storage := storage.NewMemoryStorage(ctx) cache := cache.NewMemoryCache() - queueService := queue.NewConfigurationService(ctx, storage) + queueService := queue.NewQueueConfigurationService(ctx, storage) - messagePool := messagepool.NewMessagePool(&audit.AuditorImpl{}, storage, queueService, cache) + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) - srv := NewMemoryDeckardService(messagePool, queueService) + srv := NewMemoryDeckardService(queue, queueService) suite.Run(t, &DeckardIntegrationTestSuite{ - deckard: srv, - deckardMessagePool: messagePool, - deckardCache: cache, - deckardStorage: storage, + deckard: srv, + deckardQueue: queue, + deckardCache: cache, + deckardStorage: storage, }) } @@ -232,17 +231,17 @@ func TestRedisAndMongoDeckardIntegration(t *testing.T) { require.NoError(t, err) - queueService := queue.NewConfigurationService(ctx, storage) + queueService := queue.NewQueueConfigurationService(ctx, storage) - messagePool := messagepool.NewMessagePool(&audit.AuditorImpl{}, storage, queueService, cache) + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) - srv := NewDeckardService(messagePool, queueService) + srv := NewDeckardService(queue, queueService) suite.Run(t, &DeckardIntegrationTestSuite{ - deckard: srv, - deckardMessagePool: messagePool, - deckardCache: cache, - deckardStorage: storage, + deckard: srv, + deckardQueue: queue, + deckardCache: cache, + deckardStorage: storage, }) } @@ -254,11 +253,11 @@ func TestFlushMemoryDeckardIntegration(t *testing.T) { storage := storage.NewMemoryStorage(ctx) cache := cache.NewMemoryCache() - queueService := queue.NewConfigurationService(ctx, storage) + queueService := queue.NewQueueConfigurationService(ctx, storage) - messagePool := messagepool.NewMessagePool(&audit.AuditorImpl{}, storage, queueService, cache) + queue := queue.NewQueue(&audit.AuditorImpl{}, storage, queueService, cache) - srv := NewMemoryDeckardService(messagePool, queueService) + srv := NewMemoryDeckardService(queue, queueService) _, err := srv.Add(ctx, &deckard.AddRequest{ Messages: []*deckard.AddMessage{ @@ -273,7 +272,7 @@ func TestFlushMemoryDeckardIntegration(t *testing.T) { }) require.NoError(t, err) - count, err := messagePool.Count(ctx, nil) + count, err := queue.Count(ctx, nil) require.NoError(t, err) require.Equal(t, int64(1), count) @@ -281,7 +280,7 @@ func TestFlushMemoryDeckardIntegration(t *testing.T) { require.NoError(t, err) require.True(t, result.Success) - count, err = messagePool.Count(ctx, nil) + count, err = queue.Count(ctx, nil) require.NoError(t, err) require.Equal(t, int64(0), count) } @@ -295,19 +294,19 @@ func TestFlushOnNonMemoryDeckardShouldNotSuccess(t *testing.T) { require.False(t, result.Success) } -func TestGetMessagePoolError(t *testing.T) { +func TestGetQueueError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Pull( + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Pull( ctx, "queue", int64(1000), int64(34), ).Return(nil, errors.New("pool error")) - _, err := NewDeckardService(mockMessagePool, nil).Pull(ctx, &deckard.PullRequest{ + _, err := NewDeckardService(mockQueue, nil).Pull(ctx, &deckard.PullRequest{ Queue: "queue", Amount: 1234, ScoreFilter: 34, @@ -316,19 +315,19 @@ func TestGetMessagePoolError(t *testing.T) { require.Error(t, err) } -func TestGetMessagePoolNoMessages(t *testing.T) { +func TestGetQueueNoMessages(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Pull( + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Pull( ctx, "queue", int64(1000), int64(34), ).Return(nil, nil) - response, err := NewDeckardService(mockMessagePool, nil).Pull(ctx, &deckard.PullRequest{ + response, err := NewDeckardService(mockQueue, nil).Pull(ctx, &deckard.PullRequest{ Queue: "queue", Amount: 1234, ScoreFilter: 34, @@ -342,8 +341,8 @@ func TestAck(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Ack( + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Ack( ctx, &entities.Message{ ID: "1234567", @@ -355,7 +354,7 @@ func TestAck(t *testing.T) { "reason_test", ).Return(true, nil) - response, err := NewDeckardService(mockMessagePool, nil).Ack(ctx, &deckard.AckRequest{ + response, err := NewDeckardService(mockQueue, nil).Ack(ctx, &deckard.AckRequest{ Id: "1234567", Queue: "queue", Reason: "reason_test", @@ -371,8 +370,8 @@ func TestAckPoolError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Ack( + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Ack( ctx, &entities.Message{ ID: "1234567", @@ -384,7 +383,7 @@ func TestAckPoolError(t *testing.T) { "reason_test", ).Return(false, errors.New("pool error")) - result, err := NewDeckardService(mockMessagePool, nil).Ack(ctx, &deckard.AckRequest{ + result, err := NewDeckardService(mockQueue, nil).Ack(ctx, &deckard.AckRequest{ Id: "1234567", Queue: "queue", Reason: "reason_test", @@ -400,8 +399,8 @@ func TestNack(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Nack( + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Nack( ctx, &entities.Message{ ID: "1234567", @@ -413,7 +412,7 @@ func TestNack(t *testing.T) { "reason_test", ).Return(true, nil) - response, err := NewDeckardService(mockMessagePool, nil).Nack(ctx, &deckard.AckRequest{ + response, err := NewDeckardService(mockQueue, nil).Nack(ctx, &deckard.AckRequest{ Id: "1234567", Queue: "queue", Reason: "reason_test", @@ -429,8 +428,8 @@ func TestNackPoolError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Nack( + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Nack( ctx, &entities.Message{ ID: "1234567", @@ -442,7 +441,7 @@ func TestNackPoolError(t *testing.T) { "reason_test", ).Return(false, errors.New("pool error")) - response, err := NewDeckardService(mockMessagePool, nil).Nack(ctx, &deckard.AckRequest{ + response, err := NewDeckardService(mockQueue, nil).Nack(ctx, &deckard.AckRequest{ Id: "1234567", Queue: "queue", Reason: "reason_test", @@ -458,15 +457,15 @@ func TestCountMessage(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) - mockMessagePool.EXPECT().Count(ctx, &storage.FindOptions{ + mockQueue.EXPECT().Count(ctx, &storage.FindOptions{ InternalFilter: &storage.InternalFilter{ Queue: "queue", }, }).Return(int64(543), nil) - response, err := NewDeckardService(mockMessagePool, nil).Count(ctx, &deckard.CountRequest{ + response, err := NewDeckardService(mockQueue, nil).Count(ctx, &deckard.CountRequest{ Queue: "queue", }) @@ -478,15 +477,15 @@ func TestCountMessageStorageError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) - mockMessagePool.EXPECT().Count(ctx, &storage.FindOptions{ + mockQueue.EXPECT().Count(ctx, &storage.FindOptions{ InternalFilter: &storage.InternalFilter{ Queue: "queue", }, }).Return(int64(0), errors.New("storage error")) - _, err := NewDeckardService(mockMessagePool, nil).Count(ctx, &deckard.CountRequest{ + _, err := NewDeckardService(mockQueue, nil).Count(ctx, &deckard.CountRequest{ Queue: "queue", }) @@ -497,10 +496,10 @@ func TestRemoveMessage(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Remove(ctx, "queue", "REQUEST", []string{"1", "2", "3"}).Return(int64(3), int64(2), nil) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Remove(ctx, "queue", "REQUEST", []string{"1", "2", "3"}).Return(int64(3), int64(2), nil) - response, err := NewDeckardService(mockMessagePool, nil).Remove(ctx, &deckard.RemoveRequest{ + response, err := NewDeckardService(mockQueue, nil).Remove(ctx, &deckard.RemoveRequest{ Queue: "queue", Ids: []string{"1", "2", "3"}, }) @@ -510,14 +509,14 @@ func TestRemoveMessage(t *testing.T) { require.Equal(t, int64(2), response.GetStorageRemoved()) } -func TestRemoveMessagePoolError(t *testing.T) { +func TestRemoveQueueError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - mockMessagePool.EXPECT().Remove(ctx, "queue", "REQUEST", []string{"1", "2", "3"}).Return(int64(0), int64(0), errors.New("pool error")) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + mockQueue.EXPECT().Remove(ctx, "queue", "REQUEST", []string{"1", "2", "3"}).Return(int64(0), int64(0), errors.New("pool error")) - _, err := NewDeckardService(mockMessagePool, nil).Remove(ctx, &deckard.RemoveRequest{ + _, err := NewDeckardService(mockQueue, nil).Remove(ctx, &deckard.RemoveRequest{ Queue: "queue", Ids: []string{"1", "2", "3"}, }) @@ -529,8 +528,8 @@ func TestRemoveMessageRequestWithoutIds(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) - response, err := NewDeckardService(mockMessagePool, nil).Remove(ctx, &deckard.RemoveRequest{ + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) + response, err := NewDeckardService(mockQueue, nil).Remove(ctx, &deckard.RemoveRequest{ Queue: "queue", Ids: []string{}, }) @@ -544,9 +543,9 @@ func TestGetMessageByIdInvalidId(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) - _, err := NewDeckardService(mockMessagePool, nil).GetById(ctx, &deckard.GetByIdRequest{ + _, err := NewDeckardService(mockQueue, nil).GetById(ctx, &deckard.GetByIdRequest{ Queue: "queue", Id: "", }) @@ -558,9 +557,9 @@ func TestGetMessageByIdInvalidQueue(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) - _, err := NewDeckardService(mockMessagePool, nil).GetById(ctx, &deckard.GetByIdRequest{ + _, err := NewDeckardService(mockQueue, nil).GetById(ctx, &deckard.GetByIdRequest{ Queue: "", Id: "fasdfads", }) @@ -572,9 +571,9 @@ func TestGetMessageById(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) - mockMessagePool.EXPECT().GetStorageMessages(ctx, &storage.FindOptions{ + mockQueue.EXPECT().GetStorageMessages(ctx, &storage.FindOptions{ Limit: 1, InternalFilter: &storage.InternalFilter{ Ids: &[]string{"123"}, @@ -590,7 +589,7 @@ func TestGetMessageById(t *testing.T) { }, }}, nil) - response, err := NewDeckardService(mockMessagePool, nil).GetById(ctx, &deckard.GetByIdRequest{ + response, err := NewDeckardService(mockQueue, nil).GetById(ctx, &deckard.GetByIdRequest{ Queue: "queue", Id: "123", }) @@ -612,9 +611,9 @@ func TestGetMessageByIdNotFound(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) - mockMessagePool.EXPECT().GetStorageMessages(ctx, &storage.FindOptions{ + mockQueue.EXPECT().GetStorageMessages(ctx, &storage.FindOptions{ Limit: 1, InternalFilter: &storage.InternalFilter{ Ids: &[]string{"123"}, @@ -622,7 +621,7 @@ func TestGetMessageByIdNotFound(t *testing.T) { }, }).Return(nil, nil) - response, err := NewDeckardService(mockMessagePool, nil).GetById(ctx, &deckard.GetByIdRequest{ + response, err := NewDeckardService(mockQueue, nil).GetById(ctx, &deckard.GetByIdRequest{ Queue: "queue", Id: "123", }) @@ -636,9 +635,9 @@ func TestGetMessageByIdStorageError(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() - mockMessagePool := mocks.NewMockDeckardMessagePool(mockCtrl) + mockQueue := mocks.NewMockDeckardQueue(mockCtrl) - mockMessagePool.EXPECT().GetStorageMessages(ctx, &storage.FindOptions{ + mockQueue.EXPECT().GetStorageMessages(ctx, &storage.FindOptions{ Limit: 1, InternalFilter: &storage.InternalFilter{ Ids: &[]string{"123"}, @@ -646,7 +645,7 @@ func TestGetMessageByIdStorageError(t *testing.T) { }, }).Return(nil, errors.New("storage error")) - _, err := NewDeckardService(mockMessagePool, nil).GetById(ctx, &deckard.GetByIdRequest{ + _, err := NewDeckardService(mockQueue, nil).GetById(ctx, &deckard.GetByIdRequest{ Queue: "queue", Id: "123", })