From 65eb663ada3786c69095e630add566bee6e64360 Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Mon, 7 Jun 2021 11:20:35 +0200 Subject: [PATCH 1/5] fix: do not get item from a unit taht can't sync --- engine/cdn/cdn_item.go | 1 + engine/cdn/storage/types.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index 6c19e4b02d..a98b88a440 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -386,6 +386,7 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits) itemUnits = s.Units.FilterItemUnitFromBuffer(itemUnits) + itemUnits = s.Units.FilterNotSyncBackend(itemUnits) if len(itemUnits) == 0 { return "", "", sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", itemID)) diff --git a/engine/cdn/storage/types.go b/engine/cdn/storage/types.go index d421ffc587..781fc2dda0 100644 --- a/engine/cdn/storage/types.go +++ b/engine/cdn/storage/types.go @@ -286,6 +286,26 @@ func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit { } } +func (x *RunningStorageUnits) CanSync(unitID string) bool { + for _, unit := range x.Storages { + if unit.ID() == unitID { + return unit.CanSync() + } + } + return false +} + +func (x *RunningStorageUnits) FilterNotSyncBackend(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit { + itemsUnits := make([]sdk.CDNItemUnit, 0, len(ius)) + for _, u := range ius { + if !x.CanSync(u.UnitID) { + continue + } + itemsUnits = append(itemsUnits, u) + } + return itemsUnits +} + func (x *RunningStorageUnits) FilterItemUnitFromBuffer(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit { itemsUnits := make([]sdk.CDNItemUnit, 0, len(ius)) for _, u := range ius { From 119d2a268cf557ef1d1e8ef5b858ced840aef160 Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Mon, 7 Jun 2021 11:46:51 +0200 Subject: [PATCH 2/5] feat: do not send logs in CDS --- engine/cdn/cdn_log_engine.go | 37 +++--------------------------------- 1 file changed, 3 insertions(+), 34 deletions(-) diff --git a/engine/cdn/cdn_log_engine.go b/engine/cdn/cdn_log_engine.go index d6e9f659c3..4e462d3226 100644 --- a/engine/cdn/cdn_log_engine.go +++ b/engine/cdn/cdn_log_engine.go @@ -3,11 +3,9 @@ package cdn import ( "context" "encoding/json" - "fmt" "strings" "time" - gocache "github.com/patrickmn/go-cache" "github.com/rockbears/log" "github.com/ovh/cds/engine/cache" @@ -126,21 +124,12 @@ func (s *Service) dequeueMessages(ctx context.Context, jobLogsQueueKey string, q hms = append(hms, hm) } - // Send TO CDS API - if err := s.sendToCDS(ctx, hms); err != nil { - err = sdk.WrapError(err, "unable to send log to API") + // Send TO CDN Buffer + if err := s.sendToBufferWithRetry(ctx, hms); err != nil { + err = sdk.WrapError(err, "unable to send log into buffer") ctx = sdk.ContextWithStacktrace(ctx, err) log.Error(ctx, err.Error()) } - - // Send TO CDN Buffer - if s.cdnEnabled(ctx, hms[0].Signature.ProjectKey) { - if err := s.sendToBufferWithRetry(ctx, hms); err != nil { - err = sdk.WrapError(err, "unable to send log into buffer") - ctx = sdk.ContextWithStacktrace(ctx, err) - log.Error(ctx, err.Error()) - } - } nbMessages += len(msgs) t1 = time.Now() } @@ -191,23 +180,3 @@ func (s *Service) canDequeue(ctx context.Context, jobID string) (string, error) } return jobQueueKey, nil } - -// Check if storage on CDN is enabled -func (s *Service) cdnEnabled(ctx context.Context, projectKey string) bool { - cacheKey := fmt.Sprintf("cdn-job-logs-enabled-project-%s", projectKey) - enabledI, has := runCache.Get(cacheKey) - if has { - return enabledI.(bool) - } - - resp, err := s.Client.FeatureEnabled(sdk.FeatureCDNJobLogs, map[string]string{ - "project_key": projectKey, - }) - if err != nil { - log.Error(ctx, "unable to get job logs feature for project %s: %v", projectKey, err) - return false - } - enabled := !resp.Exists || resp.Enabled - runCache.Set(cacheKey, enabled, gocache.DefaultExpiration) - return enabled -} From d154c9583c6ad9611815a5af7d8bc3a11325551f Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Mon, 7 Jun 2021 11:53:13 +0200 Subject: [PATCH 3/5] fix: remove unused code --- engine/cdn/cdn_log_store.go | 58 ------------------------------------- 1 file changed, 58 deletions(-) diff --git a/engine/cdn/cdn_log_store.go b/engine/cdn/cdn_log_store.go index 76b76ee055..17b43775e0 100644 --- a/engine/cdn/cdn_log_store.go +++ b/engine/cdn/cdn_log_store.go @@ -2,7 +2,6 @@ package cdn import ( "context" - "fmt" "time" "github.com/rockbears/log" @@ -14,63 +13,6 @@ import ( "github.com/ovh/cds/sdk/cdn" ) -func (s *Service) sendToCDS(ctx context.Context, msgs []handledMessage) error { - switch { - case msgs[0].Signature.Service != nil: - for _, msg := range msgs { - // Format line - msg.Msg.Full = buildMessage(msg) - if msg.Signature.Service != nil { - logs := sdk.ServiceLog{ - ServiceRequirementName: msg.Signature.Service.RequirementName, - ServiceRequirementID: msg.Signature.Service.RequirementID, - WorkflowNodeJobRunID: msg.Signature.JobID, - WorkflowNodeRunID: msg.Signature.NodeRunID, - Val: msg.Msg.Full, - } - if err := s.Client.QueueServiceLogs(ctx, []sdk.ServiceLog{logs}); err != nil { - return err - } - } - } - return nil - default: - // Aggregate messages by step - hms := make(map[string]handledMessage, len(msgs)) - for _, msg := range msgs { - // Format line - msg.Msg.Full = buildMessage(msg) - - k := fmt.Sprintf("%d-%d-%d", msg.Signature.JobID, msg.Signature.NodeRunID, msg.Signature.Worker.StepOrder) - // Aggregates lines in a single message - if _, ok := hms[k]; ok { - full := hms[k].Msg.Full - msg.Msg.Full = fmt.Sprintf("%s%s", full, msg.Msg.Full) - hms[k] = msg - } else { - hms[k] = msg - } - } - - // Send logs to CDS API by step - for _, hm := range hms { - now := time.Now() - l := sdk.Log{ - JobID: hm.Signature.JobID, - NodeRunID: hm.Signature.NodeRunID, - LastModified: &now, - StepOrder: hm.Signature.Worker.StepOrder, - Val: hm.Msg.Full, - } - if err := s.Client.QueueSendLogs(ctx, hm.Signature.JobID, l); err != nil { - return err - } - } - } - - return nil -} - func (s *Service) sendToBufferWithRetry(ctx context.Context, hms []handledMessage) error { if len(hms) == 0 { return nil From e73617bd98bd4d2f5cbeee7e1373c19592736a6b Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Mon, 7 Jun 2021 12:00:27 +0200 Subject: [PATCH 4/5] fix: remove unsync storage from stat --- engine/cdn/cdn_metrics.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/cdn/cdn_metrics.go b/engine/cdn/cdn_metrics.go index 9ee91be587..0dd1240d8a 100644 --- a/engine/cdn/cdn_metrics.go +++ b/engine/cdn/cdn_metrics.go @@ -122,6 +122,9 @@ func (s *Service) ComputeMetrics(ctx context.Context) { var storageStats []storage.Stat for _, su := range s.Units.Storages { + if !su.CanSync() { + continue + } storageStats = append(storageStats, s.countItemsForUnit(ctx, su)...) } From a29480ba13343b26c38d65444805880b4ab1a27a Mon Sep 17 00:00:00 2001 From: Steven Guiheux Date: Fri, 11 Jun 2021 15:55:42 +0200 Subject: [PATCH 5/5] fix: revert send log to cds + add index --- engine/cdn/cdn_log_engine.go | 37 +++++++++++++++-- engine/cdn/cdn_log_store.go | 58 +++++++++++++++++++++++++++ engine/sql/cdn/014_cdn_type_index.sql | 7 ++++ 3 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 engine/sql/cdn/014_cdn_type_index.sql diff --git a/engine/cdn/cdn_log_engine.go b/engine/cdn/cdn_log_engine.go index 4e462d3226..d6e9f659c3 100644 --- a/engine/cdn/cdn_log_engine.go +++ b/engine/cdn/cdn_log_engine.go @@ -3,9 +3,11 @@ package cdn import ( "context" "encoding/json" + "fmt" "strings" "time" + gocache "github.com/patrickmn/go-cache" "github.com/rockbears/log" "github.com/ovh/cds/engine/cache" @@ -124,12 +126,21 @@ func (s *Service) dequeueMessages(ctx context.Context, jobLogsQueueKey string, q hms = append(hms, hm) } - // Send TO CDN Buffer - if err := s.sendToBufferWithRetry(ctx, hms); err != nil { - err = sdk.WrapError(err, "unable to send log into buffer") + // Send TO CDS API + if err := s.sendToCDS(ctx, hms); err != nil { + err = sdk.WrapError(err, "unable to send log to API") ctx = sdk.ContextWithStacktrace(ctx, err) log.Error(ctx, err.Error()) } + + // Send TO CDN Buffer + if s.cdnEnabled(ctx, hms[0].Signature.ProjectKey) { + if err := s.sendToBufferWithRetry(ctx, hms); err != nil { + err = sdk.WrapError(err, "unable to send log into buffer") + ctx = sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, err.Error()) + } + } nbMessages += len(msgs) t1 = time.Now() } @@ -180,3 +191,23 @@ func (s *Service) canDequeue(ctx context.Context, jobID string) (string, error) } return jobQueueKey, nil } + +// Check if storage on CDN is enabled +func (s *Service) cdnEnabled(ctx context.Context, projectKey string) bool { + cacheKey := fmt.Sprintf("cdn-job-logs-enabled-project-%s", projectKey) + enabledI, has := runCache.Get(cacheKey) + if has { + return enabledI.(bool) + } + + resp, err := s.Client.FeatureEnabled(sdk.FeatureCDNJobLogs, map[string]string{ + "project_key": projectKey, + }) + if err != nil { + log.Error(ctx, "unable to get job logs feature for project %s: %v", projectKey, err) + return false + } + enabled := !resp.Exists || resp.Enabled + runCache.Set(cacheKey, enabled, gocache.DefaultExpiration) + return enabled +} diff --git a/engine/cdn/cdn_log_store.go b/engine/cdn/cdn_log_store.go index 17b43775e0..76b76ee055 100644 --- a/engine/cdn/cdn_log_store.go +++ b/engine/cdn/cdn_log_store.go @@ -2,6 +2,7 @@ package cdn import ( "context" + "fmt" "time" "github.com/rockbears/log" @@ -13,6 +14,63 @@ import ( "github.com/ovh/cds/sdk/cdn" ) +func (s *Service) sendToCDS(ctx context.Context, msgs []handledMessage) error { + switch { + case msgs[0].Signature.Service != nil: + for _, msg := range msgs { + // Format line + msg.Msg.Full = buildMessage(msg) + if msg.Signature.Service != nil { + logs := sdk.ServiceLog{ + ServiceRequirementName: msg.Signature.Service.RequirementName, + ServiceRequirementID: msg.Signature.Service.RequirementID, + WorkflowNodeJobRunID: msg.Signature.JobID, + WorkflowNodeRunID: msg.Signature.NodeRunID, + Val: msg.Msg.Full, + } + if err := s.Client.QueueServiceLogs(ctx, []sdk.ServiceLog{logs}); err != nil { + return err + } + } + } + return nil + default: + // Aggregate messages by step + hms := make(map[string]handledMessage, len(msgs)) + for _, msg := range msgs { + // Format line + msg.Msg.Full = buildMessage(msg) + + k := fmt.Sprintf("%d-%d-%d", msg.Signature.JobID, msg.Signature.NodeRunID, msg.Signature.Worker.StepOrder) + // Aggregates lines in a single message + if _, ok := hms[k]; ok { + full := hms[k].Msg.Full + msg.Msg.Full = fmt.Sprintf("%s%s", full, msg.Msg.Full) + hms[k] = msg + } else { + hms[k] = msg + } + } + + // Send logs to CDS API by step + for _, hm := range hms { + now := time.Now() + l := sdk.Log{ + JobID: hm.Signature.JobID, + NodeRunID: hm.Signature.NodeRunID, + LastModified: &now, + StepOrder: hm.Signature.Worker.StepOrder, + Val: hm.Msg.Full, + } + if err := s.Client.QueueSendLogs(ctx, hm.Signature.JobID, l); err != nil { + return err + } + } + } + + return nil +} + func (s *Service) sendToBufferWithRetry(ctx context.Context, hms []handledMessage) error { if len(hms) == 0 { return nil diff --git a/engine/sql/cdn/014_cdn_type_index.sql b/engine/sql/cdn/014_cdn_type_index.sql new file mode 100644 index 0000000000..a3a9237ba2 --- /dev/null +++ b/engine/sql/cdn/014_cdn_type_index.sql @@ -0,0 +1,7 @@ +-- +migrate Up +SELECT create_index('item', 'idx_item_type_status', 'type,status'); +SELECT create_index('storage_unit_item', 'idx_storage_unit_type_item_unit_id', 'type,unit_id'); + +-- +migrate Down +DROP INDEX "idx_item_type_status"; +DROP INDEX "idx_storage_unit_type_item_unit_id"