diff --git a/.github/workflows/create-gh-release.yaml b/.github/workflows/create-gh-release.yaml index cea150c..543062f 100644 --- a/.github/workflows/create-gh-release.yaml +++ b/.github/workflows/create-gh-release.yaml @@ -16,4 +16,4 @@ jobs: uses: SneaksAndData/github-actions/semver_release@v0.1.9 with: major_v: 0 - minor_v: 2 + minor_v: 3 diff --git a/pkg/checkpoint/models/algorithm_request.go b/pkg/checkpoint/models/algorithm_request.go index 7635d6b..182bc73 100644 --- a/pkg/checkpoint/models/algorithm_request.go +++ b/pkg/checkpoint/models/algorithm_request.go @@ -13,5 +13,5 @@ type AlgorithmRequest struct { MonitoringMetadata map[string][]string `json:"monitoringMetadata,omitempty"` RequestApiVersion string `json:"requestApiVersion,omitempty"` Tag string `json:"tag,omitempty"` - ParentRequest ParentAlgorithmRequest `json:"parentRequest,omitempty"` + ParentRequest *ParentAlgorithmRequest `json:"parentRequest,omitempty"` } diff --git a/pkg/checkpoint/models/checkpointed_request.go b/pkg/checkpoint/models/checkpointed_request.go index 3f6192d..3514dc9 100644 --- a/pkg/checkpoint/models/checkpointed_request.go +++ b/pkg/checkpoint/models/checkpointed_request.go @@ -64,7 +64,7 @@ type CheckpointedRequestCqlModel struct { SentAt time.Time AppliedConfiguration string ConfigurationOverrides string - MonitoringMetadata string + MonitoringMetadata map[string][]string ContentHash string LastModified time.Time Tag string @@ -74,7 +74,7 @@ type CheckpointedRequestCqlModel struct { } var CheckpointedRequestTable = table.New(table.Metadata{ - Name: "checkpoints", + Name: "nexus.checkpoints", Columns: []string{ "algorithm", "id", @@ -106,7 +106,6 @@ var CheckpointedRequestTable = table.New(table.Metadata{ func (cr *CheckpointedRequest) ToCqlModel() *CheckpointedRequestCqlModel { serializedConfig, _ := json.Marshal(cr.AppliedConfiguration) serializedOverrides, _ := json.Marshal(cr.ConfigurationOverrides) - serializedMetadata, _ := json.Marshal(cr.MonitoringMetadata) return &CheckpointedRequestCqlModel{ Algorithm: cr.Algorithm, @@ -122,7 +121,7 @@ func (cr *CheckpointedRequest) ToCqlModel() *CheckpointedRequestCqlModel { SentAt: cr.SentAt, AppliedConfiguration: string(serializedConfig), ConfigurationOverrides: string(serializedOverrides), - MonitoringMetadata: string(serializedMetadata), + MonitoringMetadata: cr.MonitoringMetadata, ContentHash: cr.ContentHash, LastModified: cr.LastModified, Tag: cr.Tag, diff --git a/pkg/checkpoint/models/submission_buffer_entry.go b/pkg/checkpoint/models/submission_buffer_entry.go index a39258f..0d6b7e5 100644 --- a/pkg/checkpoint/models/submission_buffer_entry.go +++ b/pkg/checkpoint/models/submission_buffer_entry.go @@ -7,7 +7,7 @@ import ( ) var SubmissionBufferTable = table.New(table.Metadata{ - Name: "submission_buffer", + Name: "nexus.submission_buffer", Columns: []string{ "algorithm", "id", diff --git a/pkg/checkpoint/request/buffer.go b/pkg/checkpoint/request/buffer.go index 4b99d4a..e199536 100644 --- a/pkg/checkpoint/request/buffer.go +++ b/pkg/checkpoint/request/buffer.go @@ -98,7 +98,7 @@ func (buffer *DefaultBuffer) Start(submitter pipeline.StageActor[*BufferOutput, submitter, ) - go buffer.actor.Start(buffer.ctx) + buffer.actor.Start(buffer.ctx) } func (buffer *DefaultBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.MachineLearningAlgorithmSpec) error { diff --git a/pkg/checkpoint/request/checkpoint_store.go b/pkg/checkpoint/request/checkpoint_store.go index 83f99a5..bd7ae19 100644 --- a/pkg/checkpoint/request/checkpoint_store.go +++ b/pkg/checkpoint/request/checkpoint_store.go @@ -22,7 +22,7 @@ func (cqls *CqlStore) UpsertCheckpoint(checkpoint *models.CheckpointedRequest) e cloned.LastModified = time.Now() - var query = cqls.cqlSession.Query(models.CheckpointedRequestTable.Insert()).BindStruct(*cloned) + var query = cqls.cqlSession.Query(models.CheckpointedRequestTable.Insert()).BindStruct(*cloned.ToCqlModel()) if err := query.ExecRelease(); err != nil { cqls.logger.V(1).Error(err, "Error when inserting a checkpoint", "algorithm", checkpoint.Algorithm, "id", checkpoint.Id) return err diff --git a/pkg/pipeline/service_pipeline.go b/pkg/pipeline/service_pipeline.go index b475429..a01e0d0 100644 --- a/pkg/pipeline/service_pipeline.go +++ b/pkg/pipeline/service_pipeline.go @@ -50,7 +50,7 @@ func NewDefaultPipelineStageActor[TIn comparable, TOut comparable](failureRateBa func (a *DefaultPipelineStageActor[TIn, TOut]) processNextElement(ctx context.Context, logger klog.Logger, metrics *statsd.Client) bool { element, shutdown := a.queue.Get() - logger.V(0).Info("Starting processing element", "element", element) + logger.V(0).Info("Starting processing element for stage", "element", element, "stage", a.stageName) elementProcessStart := time.Now() if shutdown { @@ -67,6 +67,7 @@ func (a *DefaultPipelineStageActor[TIn, TOut]) processNextElement(ctx context.Co defer telemetry.Gauge(metrics, fmt.Sprintf("%s_queue_size", a.stageName), float64(a.queue.Len()), a.stageTags, 1) result, err := a.processor(element) + logger.V(0).Info("Finished processing element for stage", "element", element, "stage", a.stageName) if err == nil { // If no error occurs then we send the result to the receiver, if one is attached if a.receiver != nil { @@ -76,8 +77,8 @@ func (a *DefaultPipelineStageActor[TIn, TOut]) processNextElement(ctx context.Co } // there was a failure so be sure to report it. This method allows for // pluggable error handling which can be used for things like cluster-monitoring. - utilruntime.HandleErrorWithContext(ctx, err, "Error processing element", "element", element) - logger.V(0).Error(err, "Starting processing element", "element", element) + utilruntime.HandleErrorWithContext(ctx, err, "Error processing element for stage", "element", element, "stage", a.stageName) + logger.V(0).Error(err, "Error when processing element for stage", "element", element, "stage", a.stageName) // forget this submission to prevent clogging the queue a.queue.Forget(element) @@ -95,15 +96,15 @@ func (a *DefaultPipelineStageActor[TIn, TOut]) Start(ctx context.Context) { logger := klog.FromContext(ctx) - logger.V(4).Info("Started workers") + logger.V(0).Info("Starting workers for stage", "stage", a.stageName) for i := 0; i < a.workers; i++ { go wait.UntilWithContext(ctx, a.runActor, time.Second) } - logger.V(4).Info("Started actor workers") + logger.V(0).Info("Started workers for stage", "workers", a.workers, "stage", a.stageName) <-ctx.Done() - logger.V(4).Info("Shutting down actor workers") + logger.V(0).Info("Shutting down workers for stage", "stage", a.stageName) } func (a *DefaultPipelineStageActor[TIn, TOut]) Receive(element TIn) {