Skip to content

Commit

Permalink
feat: [#389] The queue log needs to be controled by APP_DEBUG (#459)
Browse files Browse the repository at this point in the history
* feat: [#389] The queue log needs to be controled by APP_DEBUG

* fix unit test
  • Loading branch information
hwbrzzl authored Apr 18, 2024
1 parent 87a80b3 commit 2aa80fb
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 38 deletions.
5 changes: 4 additions & 1 deletion mail/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/suite"

configmock "github.com/goravel/framework/contracts/config/mocks"
logmock "github.com/goravel/framework/contracts/log/mocks"
"github.com/goravel/framework/contracts/mail"
queuecontract "github.com/goravel/framework/contracts/queue"
"github.com/goravel/framework/queue"
Expand Down Expand Up @@ -85,8 +86,9 @@ func (s *ApplicationTestSuite) TestSendMailWithFrom() {

func (s *ApplicationTestSuite) TestQueueMail() {
mockConfig := mockConfig(587, s.redisPort)
mockLog := &logmock.Log{}

queueFacade := queue.NewApplication(mockConfig)
queueFacade := queue.NewApplication(mockConfig, mockLog)
queueFacade.Register([]queuecontract.Job{
NewSendMailJob(mockConfig),
})
Expand Down Expand Up @@ -115,6 +117,7 @@ func (s *ApplicationTestSuite) TestQueueMail() {
func mockConfig(mailPort, redisPort int) *configmock.Config {
mockConfig := &configmock.Config{}
mockConfig.On("GetString", "app.name").Return("goravel")
mockConfig.On("GetBool", "app.debug").Return(false)
mockConfig.On("GetString", "queue.default").Return("redis")
mockConfig.On("GetString", "queue.connections.sync.driver").Return("sync")
mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis")
Expand Down
13 changes: 8 additions & 5 deletions queue/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,34 @@ package queue

import (
configcontract "github.com/goravel/framework/contracts/config"
"github.com/goravel/framework/contracts/log"
"github.com/goravel/framework/contracts/queue"
)

type Application struct {
config *Config
jobs []queue.Job
log log.Log
}

func NewApplication(config configcontract.Config) *Application {
func NewApplication(config configcontract.Config, log log.Log) *Application {
return &Application{
config: NewConfig(config),
log: log,
}
}

func (app *Application) Worker(args *queue.Args) queue.Worker {
defaultConnection := app.config.DefaultConnection()

if args == nil {
return NewWorker(app.config, 1, defaultConnection, app.jobs, app.config.Queue(defaultConnection, ""))
return NewWorker(app.config, app.log, 1, defaultConnection, app.jobs, app.config.Queue(defaultConnection, ""))
}
if args.Connection == "" {
args.Connection = defaultConnection
}

return NewWorker(app.config, args.Concurrent, args.Connection, app.jobs, app.config.Queue(args.Connection, args.Queue))
return NewWorker(app.config, app.log, args.Concurrent, args.Connection, app.jobs, app.config.Queue(args.Connection, args.Queue))
}

func (app *Application) Register(jobs []queue.Job) {
Expand All @@ -38,9 +41,9 @@ func (app *Application) GetJobs() []queue.Job {
}

func (app *Application) Job(job queue.Job, args []queue.Arg) queue.Task {
return NewTask(app.config, job, args)
return NewTask(app.config, app.log, job, args)
}

func (app *Application) Chain(jobs []queue.Jobs) queue.Task {
return NewChainTask(app.config, jobs)
return NewChainTask(app.config, app.log, jobs)
}
109 changes: 87 additions & 22 deletions queue/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,35 @@ import (

"github.com/ory/dockertest/v3"
"github.com/spf13/cast"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

configmock "github.com/goravel/framework/contracts/config/mocks"
logmock "github.com/goravel/framework/contracts/log/mocks"
"github.com/goravel/framework/contracts/queue"
queuemock "github.com/goravel/framework/contracts/queue/mocks"
"github.com/goravel/framework/support/carbon"
testingdocker "github.com/goravel/framework/support/docker"
)

var (
testSyncJob = 0
testAsyncJob = 0
testDelayAsyncJob = 0
testCustomAsyncJob = 0
testErrorAsyncJob = 0
testChainAsyncJob = 0
testChainSyncJob = 0
testChainAsyncJobError = 0
testChainSyncJobError = 0
testSyncJob = 0
testAsyncJob = 0
testAsyncJobOfDisableDebug = 0
testDelayAsyncJob = 0
testCustomAsyncJob = 0
testErrorAsyncJob = 0
testChainAsyncJob = 0
testChainSyncJob = 0
testChainAsyncJobError = 0
testChainSyncJobError = 0
)

type QueueTestSuite struct {
suite.Suite
app *Application
redisResource *dockertest.Resource
mockConfig *configmock.Config
mockQueue *queuemock.Queue
mockLog *logmock.Log
}

func TestQueueTestSuite(t *testing.T) {
Expand All @@ -59,8 +61,8 @@ func TestQueueTestSuite(t *testing.T) {

func (s *QueueTestSuite) SetupTest() {
s.mockConfig = &configmock.Config{}
s.mockQueue = &queuemock.Queue{}
s.app = NewApplication(s.mockConfig)
s.mockLog = &logmock.Log{}
s.app = NewApplication(s.mockConfig, s.mockLog)
}

func (s *QueueTestSuite) TestSyncQueue() {
Expand All @@ -72,17 +74,61 @@ func (s *QueueTestSuite) TestSyncQueue() {
s.Equal(1, testSyncJob)
}

func (s *QueueTestSuite) TestDefaultAsyncQueue() {
func (s *QueueTestSuite) TestDefaultAsyncQueue_EnableDebug() {
s.mockConfig.On("GetString", "queue.default").Return("redis").Twice()
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4)
s.mockConfig.On("GetBool", "app.debug").Return(true).Times(2)
s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Times(2)
s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3)
s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice()
s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice()
s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice()
s.mockConfig.On("GetInt", "database.redis.default.port").Return(cast.ToInt(s.redisResource.GetPort("6379/tcp"))).Twice()
s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice()
s.mockLog.On("Infof", "Launching a worker with the following settings:").Once()
s.mockLog.On("Infof", "- Broker: %s", "://").Once()
s.mockLog.On("Infof", "- DefaultQueue: %s", "goravel_queues:debug").Once()
s.mockLog.On("Infof", "- ResultBackend: %s", "://").Once()
s.mockLog.On("Info", "[*] Waiting for messages. To exit press CTRL+C").Once()
s.mockLog.On("Debugf", "Received new message: %s", mock.Anything).Once()
s.mockLog.On("Debugf", "Processed task %s. Results = %s", mock.Anything, mock.Anything).Once()
s.app.jobs = []queue.Job{&TestAsyncJob{}}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func(ctx context.Context) {
s.Nil(s.app.Worker(&queue.Args{
Queue: "debug",
}).Run())

for range ctx.Done() {
return
}
}(ctx)
time.Sleep(2 * time.Second)
s.Nil(s.app.Job(&TestAsyncJob{}, []queue.Arg{
{Type: "string", Value: "TestDefaultAsyncQueue_EnableDebug"},
{Type: "int", Value: 1},
}).OnQueue("debug").Dispatch())
time.Sleep(2 * time.Second)
s.Equal(1, testAsyncJob)

s.mockConfig.AssertExpectations(s.T())
s.mockLog.AssertExpectations(s.T())
}

func (s *QueueTestSuite) TestDefaultAsyncQueue_DisableDebug() {
s.mockConfig.On("GetString", "queue.default").Return("redis").Twice()
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3)
s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2)
s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Times(3)
s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3)
s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice()
s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice()
s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice()
s.mockConfig.On("GetInt", "database.redis.default.port").Return(cast.ToInt(s.redisResource.GetPort("6379/tcp"))).Twice()
s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice()
s.app.jobs = []queue.Job{&TestAsyncJob{}}
s.app.jobs = []queue.Job{&TestAsyncJobOfDisableDebug{}}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand All @@ -94,20 +140,21 @@ func (s *QueueTestSuite) TestDefaultAsyncQueue() {
}
}(ctx)
time.Sleep(2 * time.Second)
s.Nil(s.app.Job(&TestAsyncJob{}, []queue.Arg{
{Type: "string", Value: "TestDefaultAsyncQueue"},
s.Nil(s.app.Job(&TestAsyncJobOfDisableDebug{}, []queue.Arg{
{Type: "string", Value: "TestDefaultAsyncQueue_DisableDebug"},
{Type: "int", Value: 1},
}).Dispatch())
time.Sleep(2 * time.Second)
s.Equal(1, testAsyncJob)
s.Equal(1, testAsyncJobOfDisableDebug)

s.mockConfig.AssertExpectations(s.T())
s.mockQueue.AssertExpectations(s.T())
s.mockLog.AssertExpectations(s.T())
}

func (s *QueueTestSuite) TestDelayAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("redis").Times(2)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4)
s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2)
s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice()
s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3)
s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice()
Expand Down Expand Up @@ -139,12 +186,12 @@ func (s *QueueTestSuite) TestDelayAsyncQueue() {
s.Equal(1, testDelayAsyncJob)

s.mockConfig.AssertExpectations(s.T())
s.mockQueue.AssertExpectations(s.T())
}

func (s *QueueTestSuite) TestCustomAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("redis").Twice()
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4)
s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2)
s.mockConfig.On("GetString", "queue.connections.custom.queue", "default").Return("default").Twice()
s.mockConfig.On("GetString", "queue.connections.custom.driver").Return("redis").Times(3)
s.mockConfig.On("GetString", "queue.connections.custom.connection").Return("default").Twice()
Expand Down Expand Up @@ -176,12 +223,12 @@ func (s *QueueTestSuite) TestCustomAsyncQueue() {
s.Equal(1, testCustomAsyncJob)

s.mockConfig.AssertExpectations(s.T())
s.mockQueue.AssertExpectations(s.T())
}

func (s *QueueTestSuite) TestErrorAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("redis").Twice()
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4)
s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2)
s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice()
s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3)
s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice()
Expand Down Expand Up @@ -211,12 +258,12 @@ func (s *QueueTestSuite) TestErrorAsyncQueue() {
s.Equal(0, testErrorAsyncJob)

s.mockConfig.AssertExpectations(s.T())
s.mockQueue.AssertExpectations(s.T())
}

func (s *QueueTestSuite) TestChainAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("redis").Times(2)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4)
s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2)
s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice()
s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3)
s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice()
Expand Down Expand Up @@ -266,13 +313,15 @@ func (s *QueueTestSuite) TestChainAsyncQueue() {
func (s *QueueTestSuite) TestChainAsyncQueue_Error() {
s.mockConfig.On("GetString", "queue.default").Return("redis").Times(2)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(4)
s.mockConfig.On("GetBool", "app.debug").Return(false).Times(2)
s.mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default").Twice()
s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis").Times(3)
s.mockConfig.On("GetString", "queue.connections.redis.connection").Return("default").Twice()
s.mockConfig.On("GetString", "database.redis.default.host").Return("localhost").Twice()
s.mockConfig.On("GetString", "database.redis.default.password").Return("").Twice()
s.mockConfig.On("GetInt", "database.redis.default.port").Return(cast.ToInt(s.redisResource.GetPort("6379/tcp"))).Twice()
s.mockConfig.On("GetInt", "database.redis.default.database").Return(0).Twice()
s.mockLog.On("Errorf", "Failed processing task %s. Error = %v", mock.Anything, errors.New("error")).Once()
s.app.jobs = []queue.Job{&TestChainAsyncJob{}, &TestChainSyncJob{}}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -306,6 +355,7 @@ func (s *QueueTestSuite) TestChainAsyncQueue_Error() {
s.Equal(0, testChainSyncJobError)

s.mockConfig.AssertExpectations(s.T())
s.mockLog.AssertExpectations(s.T())
}

type TestAsyncJob struct {
Expand All @@ -323,6 +373,21 @@ func (receiver *TestAsyncJob) Handle(args ...any) error {
return nil
}

type TestAsyncJobOfDisableDebug struct {
}

// Signature The name and signature of the job.
func (receiver *TestAsyncJobOfDisableDebug) Signature() string {
return "test_async_job_of_disable_debug"
}

// Handle Execute the job.
func (receiver *TestAsyncJobOfDisableDebug) Handle(args ...any) error {
testAsyncJobOfDisableDebug++

return nil
}

type TestDelayAsyncJob struct {
}

Expand Down
Loading

0 comments on commit 2aa80fb

Please sign in to comment.