diff --git a/pkg/checkpoint/models/checkpointed_request.go b/pkg/checkpoint/models/checkpointed_request.go index 87d4675..3f6192d 100644 --- a/pkg/checkpoint/models/checkpointed_request.go +++ b/pkg/checkpoint/models/checkpointed_request.go @@ -50,6 +50,29 @@ type CheckpointedRequest struct { ParentJob ParentJobReference `json:"parent_job"` } +type CheckpointedRequestCqlModel struct { + Algorithm string + Id string + LifecycleStage string + PayloadUri string + ResultUri string + AlgorithmFailureCode string + AlgorithmFailureCause string + AlgorithmFailureDetails string + ReceivedByHost string + ReceivedAt time.Time + SentAt time.Time + AppliedConfiguration string + ConfigurationOverrides string + MonitoringMetadata string + ContentHash string + LastModified time.Time + Tag string + ApiVersion string + JobUid string + ParentJob string +} + var CheckpointedRequestTable = table.New(table.Metadata{ Name: "checkpoints", Columns: []string{ @@ -80,6 +103,35 @@ var CheckpointedRequestTable = table.New(table.Metadata{ SortKey: []string{}, }) +func (cr *CheckpointedRequest) ToCqlModel() *CheckpointedRequestCqlModel { + serializedConfig, _ := json.Marshal(cr.AppliedConfiguration) + serializedOverrides, _ := json.Marshal(cr.ConfigurationOverrides) + serializedMetadata, _ := json.Marshal(cr.MonitoringMetadata) + + return &CheckpointedRequestCqlModel{ + Algorithm: cr.Algorithm, + Id: cr.Id, + LifecycleStage: cr.LifecycleStage, + PayloadUri: cr.PayloadUri, + ResultUri: cr.ResultUri, + AlgorithmFailureCode: cr.AlgorithmFailureCode, + AlgorithmFailureCause: cr.AlgorithmFailureCause, + AlgorithmFailureDetails: cr.AlgorithmFailureDetails, + ReceivedByHost: cr.ReceivedByHost, + ReceivedAt: cr.ReceivedAt, + SentAt: cr.SentAt, + AppliedConfiguration: string(serializedConfig), + ConfigurationOverrides: string(serializedOverrides), + MonitoringMetadata: string(serializedMetadata), + ContentHash: cr.ContentHash, + LastModified: cr.LastModified, + Tag: cr.Tag, + ApiVersion: cr.ApiVersion, + JobUid: cr.JobUid, + ParentJob: "", // TODO: fixme + } +} + func FromAlgorithmRequest(requestId string, algorithmName string, request *AlgorithmRequest, config *v1.MachineLearningAlgorithmSpec) (*CheckpointedRequest, []byte, error) { hostname, _ := os.Hostname() serializedPayload, err := json.Marshal(request.AlgorithmParameters) diff --git a/pkg/checkpoint/payload/s3_payload_store.go b/pkg/checkpoint/payload/s3_payload_store.go index d5038c0..33e19e1 100644 --- a/pkg/checkpoint/payload/s3_payload_store.go +++ b/pkg/checkpoint/payload/s3_payload_store.go @@ -53,8 +53,8 @@ func NewS3PayloadStore(ctx context.Context, logger klog.Logger) *S3PayloadStore func parsePath(blobPath string) *S3Path { r, _ := regexp.Compile(s3UrlRegex) - matches := r.FindAllString(blobPath, -1) - return NewS3Path(matches[0], matches[1]) + matches := r.FindStringSubmatch(blobPath) + return NewS3Path(matches[1], matches[2]) } func (store *S3PayloadStore) SaveTextAsBlob(ctx context.Context, text string, blobPath string) error { @@ -69,7 +69,7 @@ func (store *S3PayloadStore) SaveTextAsBlob(ctx context.Context, text string, bl store.logger.V(0).Error(err, "Error when persisting payload into S3") return err } - store.logger.V(4).Info("Successfully persisted algorithm payload", "payloadPath", blobPath, "checksum", *result.ChecksumSHA1) + store.logger.V(4).Info("Successfully persisted algorithm payload", "payloadPath", blobPath, "etag", *result.ETag) return nil } diff --git a/pkg/checkpoint/request/buffer.go b/pkg/checkpoint/request/buffer.go index 5b7c187..4b99d4a 100644 --- a/pkg/checkpoint/request/buffer.go +++ b/pkg/checkpoint/request/buffer.go @@ -11,7 +11,6 @@ import ( "github.com/SneaksAndData/nexus-core/pkg/telemetry" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - "path" "time" ) @@ -99,7 +98,7 @@ func (buffer *DefaultBuffer) Start(submitter pipeline.StageActor[*BufferOutput, submitter, ) - buffer.actor.Start(buffer.ctx) + go buffer.actor.Start(buffer.ctx) } func (buffer *DefaultBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.MachineLearningAlgorithmSpec) error { @@ -116,7 +115,7 @@ func (buffer *DefaultBuffer) bufferRequest(input *BufferInput) (*BufferOutput, e telemetry.Increment(buffer.metrics, "incoming_requests", input.tags()) buffer.logger.V(4).Info("Persisting payload", "request", input.Checkpoint.Id, "algorithm", input.Checkpoint.Algorithm) - payloadPath := path.Join( + payloadPath := fmt.Sprintf("%s/%s/%s", buffer.bufferConfig.PayloadStoragePath, fmt.Sprintf("algorithm=%s", input.Checkpoint.Algorithm), input.Checkpoint.Id) diff --git a/pkg/checkpoint/request/checkpoint_store.go b/pkg/checkpoint/request/checkpoint_store.go index fc5a92a..83f99a5 100644 --- a/pkg/checkpoint/request/checkpoint_store.go +++ b/pkg/checkpoint/request/checkpoint_store.go @@ -22,7 +22,7 @@ func (cqls *CqlStore) UpsertCheckpoint(checkpoint *models.CheckpointedRequest) e cloned.LastModified = time.Now() - var query = cqls.cqlSession.Query(models.CheckpointedRequestTable.Insert()).BindStruct(*checkpoint) + var query = cqls.cqlSession.Query(models.CheckpointedRequestTable.Insert()).BindStruct(*cloned) if err := query.ExecRelease(); err != nil { cqls.logger.V(1).Error(err, "Error when inserting a checkpoint", "algorithm", checkpoint.Algorithm, "id", checkpoint.Id) return err diff --git a/pkg/util/generics.go b/pkg/util/generics.go index 76ff0fd..96a8f1d 100644 --- a/pkg/util/generics.go +++ b/pkg/util/generics.go @@ -79,7 +79,7 @@ func DeepClone[T any](obj T) (*T, error) { return nil, err } cloned := new(T) - if err = json.Unmarshal(serialized, *cloned); err != nil { + if err = json.Unmarshal(serialized, cloned); err != nil { return nil, err }