diff --git a/pluginmanager/config_update_test.go b/pluginmanager/config_update_test.go index e046164a6d..66a3d6aef7 100644 --- a/pluginmanager/config_update_test.go +++ b/pluginmanager/config_update_test.go @@ -80,8 +80,7 @@ func (s *configUpdateTestSuite) TestConfigUpdate() { // unblock old config checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) - s.Equal(10000, checkFlusher.GetLogCount()) - // this magic number(10000) must exceed number of logs can be hold in processor channel(LogsChan) + aggregator buffer(defaultLogGroup) + flusher channel(LogGroupsChan) + s.Equal(0, checkFlusher.GetLogCount()) LogtailConfigLock.RLock() s.Equal(20000, GetConfigFlushers(LogtailConfig[noblockUpdateConfigName].PluginRunner)[0].(*checker.FlusherChecker).GetLogCount()) LogtailConfigLock.RUnlock() @@ -104,7 +103,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() { s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) - s.Equal(checkFlusher.GetLogCount(), 10000) + s.Equal(checkFlusher.GetLogCount(), 0) // load normal config for i := 0; i < 3; i++ { @@ -183,5 +182,5 @@ func (s *configUpdateTestSuite) TestStopAllExitTimeout() { s.Equal(0, checkFlusher.GetLogCount()) checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) - s.Equal(10000, checkFlusher.GetLogCount()) + s.Equal(0, checkFlusher.GetLogCount()) } diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index b4364b1ded..ed8578873c 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -383,6 +383,8 @@ func (p *pluginv1Runner) Stop(exit bool) error { for _, flusher := range p.FlusherPlugins { flusher.Flusher.SetUrgent(exit) } + p.LogstoreConfig.FlushOutFlag.Store(true) + for _, service := range p.ServicePlugins { _ = service.Stop() } @@ -395,7 +397,6 @@ func (p *pluginv1Runner) Stop(exit bool) error { p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") - p.LogstoreConfig.FlushOutFlag.Store(true) p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 { diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index 46217d7afa..2db673c426 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -397,6 +397,8 @@ func (p *pluginv2Runner) Stop(exit bool) error { for _, flusher := range p.FlusherPlugins { flusher.Flusher.SetUrgent(exit) } + p.LogstoreConfig.FlushOutFlag.Store(true) + for _, serviceInput := range p.ServicePlugins { _ = serviceInput.Input.Stop() } @@ -409,7 +411,6 @@ func (p *pluginv2Runner) Stop(exit bool) error { p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") - p.LogstoreConfig.FlushOutFlag.Store(true) p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 {