Skip to content

Commit

Permalink
refactor: rename messagepool package to queue (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasoares authored Jun 28, 2023
1 parent 802b22c commit 87bcec3
Show file tree
Hide file tree
Showing 37 changed files with 330 additions and 336 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ deckard # Root folder with project files and Golang service/
│ ├── cmd # Executable main file for the Deckard service
│ ├── config # Configuration variables managed by viper
│ ├── logger # Logging configuration
│ ├── messagepool # Contains all main implementation files and housekeeping program logic
│ ├── queue # Contains all queue implementation files and housekeeping program logic
│ │ ├── cache # Caching implementation
│ │ ├── entities # Message and QueueConfiguration internal definitions
│ │ ├── queue # Queue definition
Expand Down
4 changes: 2 additions & 2 deletions internal/audit/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (

"github.com/takenet/deckard/internal/config"
"github.com/takenet/deckard/internal/logger"
"github.com/takenet/deckard/internal/messagepool/entities"
"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/metrics"
"github.com/takenet/deckard/internal/project"
"github.com/takenet/deckard/internal/queue/entities"
"github.com/takenet/deckard/internal/queue/utils"
"go.uber.org/zap"

"github.com/elastic/go-elasticsearch/v7"
Expand Down
35 changes: 17 additions & 18 deletions internal/cmd/deckard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
"github.com/takenet/deckard/internal/audit"
"github.com/takenet/deckard/internal/config"
"github.com/takenet/deckard/internal/logger"
"github.com/takenet/deckard/internal/messagepool"
"github.com/takenet/deckard/internal/messagepool/cache"
"github.com/takenet/deckard/internal/messagepool/queue"
"github.com/takenet/deckard/internal/messagepool/storage"
"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/metrics"
"github.com/takenet/deckard/internal/queue"
"github.com/takenet/deckard/internal/queue/cache"
"github.com/takenet/deckard/internal/queue/storage"
"github.com/takenet/deckard/internal/queue/utils"
"github.com/takenet/deckard/internal/service"
"github.com/takenet/deckard/internal/shutdown"
"github.com/takenet/deckard/internal/trace"
Expand Down Expand Up @@ -88,16 +87,16 @@ func main() {
}
go auditor.StartSender(ctx)

queueService := queue.NewConfigurationService(ctx, dataStorage)
configurationService := queue.NewQueueConfigurationService(ctx, dataStorage)

messagePool := messagepool.NewMessagePool(auditor, dataStorage, queueService, dataCache)
queue := queue.NewQueue(auditor, dataStorage, configurationService, dataCache)

if config.GrpcEnabled.GetBool() {
server = startGrpcServer(messagePool, queueService)
server = startGrpcServer(queue, configurationService)
}

if config.HousekeeperEnabled.GetBool() {
startHouseKeeperJobs(messagePool)
startHouseKeeperJobs(queue)
}

// Handle sigterm and await termChan signal
Expand All @@ -113,8 +112,8 @@ func isMemoryInstance() bool {
return config.CacheType.Get() == string(cache.MEMORY) && config.StorageType.Get() == string(storage.MEMORY)
}

func startGrpcServer(messagepool *messagepool.MessagePool, queueService queue.ConfigurationService) *grpc.Server {
deckard := service.NewDeckardInstance(messagepool, queueService, isMemoryInstance())
func startGrpcServer(queue *queue.Queue, queueService queue.QueueConfigurationService) *grpc.Server {
deckard := service.NewDeckardInstance(queue, queueService, isMemoryInstance())

server, err := deckard.ServeGRPCServer(ctx)
if err != nil {
Expand All @@ -125,14 +124,14 @@ func startGrpcServer(messagepool *messagepool.MessagePool, queueService queue.Co
return server
}

func startHouseKeeperJobs(pool *messagepool.MessagePool) {
func startHouseKeeperJobs(pool *queue.Queue) {
go scheduleTask(
UNLOCK,
nil,
shutdown.WaitGroup,
config.HousekeeperTaskUnlockDelay.GetDuration(),
func() bool {
messagepool.ProcessLockPool(ctx, pool)
queue.ProcessLockPool(ctx, pool)

return true
},
Expand All @@ -144,7 +143,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) {
shutdown.WaitGroup,
config.HousekeeperTaskTimeoutDelay.GetDuration(),
func() bool {
_ = messagepool.ProcessTimeoutMessages(ctx, pool)
_ = queue.ProcessTimeoutMessages(ctx, pool)

return true
},
Expand All @@ -156,7 +155,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) {
shutdown.WaitGroup,
config.HousekeeperTaskMetricsDelay.GetDuration(),
func() bool {
messagepool.ComputeMetrics(ctx, pool)
queue.ComputeMetrics(ctx, pool)

return true
},
Expand All @@ -168,7 +167,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) {
shutdown.CriticalWaitGroup,
config.HousekeeperTaskUpdateDelay.GetDuration(),
func() bool {
return messagepool.RecoveryMessagesPool(ctx, pool)
return queue.RecoveryMessagesPool(ctx, pool)
},
)

Expand All @@ -178,7 +177,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) {
shutdown.WaitGroup,
config.HousekeeperTaskMaxElementsDelay.GetDuration(),
func() bool {
metrify, _ := messagepool.RemoveExceedingMessages(ctx, pool)
metrify, _ := queue.RemoveExceedingMessages(ctx, pool)

return metrify
},
Expand All @@ -192,7 +191,7 @@ func startHouseKeeperJobs(pool *messagepool.MessagePool) {
func() bool {
now := time.Now()

metrify, _ := messagepool.RemoveTTLMessages(ctx, pool, &now)
metrify, _ := queue.RemoveTTLMessages(ctx, pool, &now)

return metrify
},
Expand Down
32 changes: 16 additions & 16 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/takenet/deckard/internal/config"
"github.com/takenet/deckard/internal/logger"
"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/project"
"github.com/takenet/deckard/internal/queue/utils"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
otelmetric "go.opentelemetry.io/otel/metric"
Expand All @@ -32,7 +32,7 @@ var (
mutex = sync.Mutex{}

meter otelmetric.Meter
MetricsMap *MessagePoolMetricsMap
MetricsMap *QueueMetricsMap

// Keep tracking of features used by clients to be able to deprecate them
CodeUsage instrument.Int64Counter
Expand All @@ -48,12 +48,12 @@ var (
HousekeeperTotalElements instrument.Int64ObservableGauge

// Message Pool
MessagePoolTimeout instrument.Int64Counter
MessagePoolAck instrument.Int64Counter
MessagePoolNack instrument.Int64Counter
MessagePoolEmptyQueue instrument.Int64Counter
MessagePoolEmptyQueueStorage instrument.Int64Counter
MessagePoolNotFoundInStorage instrument.Int64Counter
QueueTimeout instrument.Int64Counter
QueueAck instrument.Int64Counter
QueueNack instrument.Int64Counter
QueueEmptyQueue instrument.Int64Counter
QueueEmptyQueueStorage instrument.Int64Counter
QueueNotFoundInStorage instrument.Int64Counter

// Storage
StorageLatency instrument.Int64Histogram
Expand Down Expand Up @@ -178,7 +178,7 @@ func createDefaultMetrics() []*dto.LabelPair {
func createMetrics() {
meter = global.MeterProvider().Meter(project.Name)

MetricsMap = NewMessagePoolMetricsMap()
MetricsMap = NewQueueMetricsMap()

// Housekeeper

Expand Down Expand Up @@ -243,39 +243,39 @@ func createMetrics() {
)
panicInstrumentationError(err)

// MessagePool
// Queue

MessagePoolTimeout, err = meter.Int64Counter(
QueueTimeout, err = meter.Int64Counter(
"deckard_message_timeout",
instrument.WithDescription("Number of message timeouts"),
)
panicInstrumentationError(err)

MessagePoolAck, err = meter.Int64Counter(
QueueAck, err = meter.Int64Counter(
"deckard_ack",
instrument.WithDescription("Number of acks received"),
)
panicInstrumentationError(err)

MessagePoolNack, err = meter.Int64Counter(
QueueNack, err = meter.Int64Counter(
"deckard_nack",
instrument.WithDescription("Number of nacks received"),
)
panicInstrumentationError(err)

MessagePoolEmptyQueue, err = meter.Int64Counter(
QueueEmptyQueue, err = meter.Int64Counter(
"deckard_messages_empty_queue",
instrument.WithDescription("Number of times a pull is made against an empty queue."),
)
panicInstrumentationError(err)

MessagePoolEmptyQueueStorage, err = meter.Int64Counter(
QueueEmptyQueueStorage, err = meter.Int64Counter(
"deckard_messages_empty_queue_not_found_storage",
instrument.WithDescription("Number of times a pull is made against an empty queue because the messages were not found in the storage."),
)
panicInstrumentationError(err)

MessagePoolNotFoundInStorage, err = meter.Int64Counter(
QueueNotFoundInStorage, err = meter.Int64Counter(
"deckard_messages_not_found_in_storage",
instrument.WithDescription("Number of messages in cache but not found in the storage."),
)
Expand Down
10 changes: 5 additions & 5 deletions internal/metrics/metrics_map.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package metrics

// Type to hold map of oldest queue elements
type MessagePoolMetricsMap struct {
type QueueMetricsMap struct {
OldestElement map[string]int64
TotalElements map[string]int64
}

func NewMessagePoolMetricsMap() *MessagePoolMetricsMap {
return &MessagePoolMetricsMap{
func NewQueueMetricsMap() *QueueMetricsMap {
return &QueueMetricsMap{
OldestElement: make(map[string]int64, 0),
TotalElements: make(map[string]int64, 0),
}
}

func (oldestMap *MessagePoolMetricsMap) UpdateOldestElementMap(data map[string]int64) {
func (oldestMap *QueueMetricsMap) UpdateOldestElementMap(data map[string]int64) {
oldestMap.OldestElement = mergeData(oldestMap.OldestElement, data)
}

func (oldestMap *MessagePoolMetricsMap) UpdateTotalElementsMap(data map[string]int64) {
func (oldestMap *QueueMetricsMap) UpdateTotalElementsMap(data map[string]int64) {
oldestMap.TotalElements = mergeData(oldestMap.TotalElements, data)
}

Expand Down
6 changes: 3 additions & 3 deletions internal/metrics/metrics_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestChangeMapShouldChangeSuccessfully(t *testing.T) {
data := NewMessagePoolMetricsMap()
data := NewQueueMetricsMap()

require.Empty(t, data.OldestElement)

Expand All @@ -20,7 +20,7 @@ func TestChangeMapShouldChangeSuccessfully(t *testing.T) {
}

func TestChangeMapWithNilMapShouldEmptyMap(t *testing.T) {
data := NewMessagePoolMetricsMap()
data := NewQueueMetricsMap()

data.OldestElement["a"] = int64(123)

Expand All @@ -32,7 +32,7 @@ func TestChangeMapWithNilMapShouldEmptyMap(t *testing.T) {
}

func TestChangeMapWithMissingQueueShouldKeepElementAsZero(t *testing.T) {
data := NewMessagePoolMetricsMap()
data := NewQueueMetricsMap()

data.OldestElement["a"] = int64(123)
data.OldestElement["b"] = int64(432)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"errors"
"time"

"github.com/takenet/deckard/internal/messagepool/entities"
"github.com/takenet/deckard/internal/queue/entities"
)

type Type string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/takenet/deckard/internal/messagepool/entities"
"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/queue/entities"
"github.com/takenet/deckard/internal/queue/utils"
)

var ctx, cancel = context.WithCancel(context.Background())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"sync"
"time"

"github.com/takenet/deckard/internal/messagepool/entities"
"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/queue/entities"
"github.com/takenet/deckard/internal/queue/utils"
)

// MemoryStorage is an implementation of the Storage Interface using memory.
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/meirf/gopart"
"github.com/takenet/deckard/internal/config"
"github.com/takenet/deckard/internal/logger"
"github.com/takenet/deckard/internal/messagepool/entities"
"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/metrics"
"github.com/takenet/deckard/internal/queue/entities"
"github.com/takenet/deckard/internal/queue/utils"
"go.opentelemetry.io/otel/attribute"
)

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/takenet/deckard/internal/config"
"github.com/takenet/deckard/internal/messagepool/entities"
"github.com/takenet/deckard/internal/queue/entities"
)

func TestRedisCacheIntegration(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strings"
"time"

"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/queue/utils"
"google.golang.org/protobuf/types/known/anypb"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/takenet/deckard/internal/messagepool/utils"
"github.com/takenet/deckard/internal/queue/utils"
)

func TestMaxScore(t *testing.T) {
Expand Down
Loading

0 comments on commit 87bcec3

Please sign in to comment.