Skip to content

Commit

Permalink
Update logging and fix Cassandra serialization (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Feb 27, 2025
1 parent e3a75e2 commit 21d33cf
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/create-gh-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
uses: SneaksAndData/github-actions/[email protected]
with:
major_v: 0
minor_v: 2
minor_v: 3
2 changes: 1 addition & 1 deletion pkg/checkpoint/models/algorithm_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ type AlgorithmRequest struct {
MonitoringMetadata map[string][]string `json:"monitoringMetadata,omitempty"`
RequestApiVersion string `json:"requestApiVersion,omitempty"`
Tag string `json:"tag,omitempty"`
ParentRequest ParentAlgorithmRequest `json:"parentRequest,omitempty"`
ParentRequest *ParentAlgorithmRequest `json:"parentRequest,omitempty"`
}
7 changes: 3 additions & 4 deletions pkg/checkpoint/models/checkpointed_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type CheckpointedRequestCqlModel struct {
SentAt time.Time
AppliedConfiguration string
ConfigurationOverrides string
MonitoringMetadata string
MonitoringMetadata map[string][]string
ContentHash string
LastModified time.Time
Tag string
Expand All @@ -74,7 +74,7 @@ type CheckpointedRequestCqlModel struct {
}

var CheckpointedRequestTable = table.New(table.Metadata{
Name: "checkpoints",
Name: "nexus.checkpoints",
Columns: []string{
"algorithm",
"id",
Expand Down Expand Up @@ -106,7 +106,6 @@ var CheckpointedRequestTable = table.New(table.Metadata{
func (cr *CheckpointedRequest) ToCqlModel() *CheckpointedRequestCqlModel {
serializedConfig, _ := json.Marshal(cr.AppliedConfiguration)
serializedOverrides, _ := json.Marshal(cr.ConfigurationOverrides)
serializedMetadata, _ := json.Marshal(cr.MonitoringMetadata)

return &CheckpointedRequestCqlModel{
Algorithm: cr.Algorithm,
Expand All @@ -122,7 +121,7 @@ func (cr *CheckpointedRequest) ToCqlModel() *CheckpointedRequestCqlModel {
SentAt: cr.SentAt,
AppliedConfiguration: string(serializedConfig),
ConfigurationOverrides: string(serializedOverrides),
MonitoringMetadata: string(serializedMetadata),
MonitoringMetadata: cr.MonitoringMetadata,
ContentHash: cr.ContentHash,
LastModified: cr.LastModified,
Tag: cr.Tag,
Expand Down
2 changes: 1 addition & 1 deletion pkg/checkpoint/models/submission_buffer_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

var SubmissionBufferTable = table.New(table.Metadata{
Name: "submission_buffer",
Name: "nexus.submission_buffer",
Columns: []string{
"algorithm",
"id",
Expand Down
2 changes: 1 addition & 1 deletion pkg/checkpoint/request/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (buffer *DefaultBuffer) Start(submitter pipeline.StageActor[*BufferOutput,
submitter,
)

go buffer.actor.Start(buffer.ctx)
buffer.actor.Start(buffer.ctx)
}

func (buffer *DefaultBuffer) Add(requestId string, algorithmName string, request *models.AlgorithmRequest, config *v1.MachineLearningAlgorithmSpec) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/checkpoint/request/checkpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (cqls *CqlStore) UpsertCheckpoint(checkpoint *models.CheckpointedRequest) e

cloned.LastModified = time.Now()

var query = cqls.cqlSession.Query(models.CheckpointedRequestTable.Insert()).BindStruct(*cloned)
var query = cqls.cqlSession.Query(models.CheckpointedRequestTable.Insert()).BindStruct(*cloned.ToCqlModel())
if err := query.ExecRelease(); err != nil {
cqls.logger.V(1).Error(err, "Error when inserting a checkpoint", "algorithm", checkpoint.Algorithm, "id", checkpoint.Id)
return err
Expand Down
13 changes: 7 additions & 6 deletions pkg/pipeline/service_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewDefaultPipelineStageActor[TIn comparable, TOut comparable](failureRateBa

func (a *DefaultPipelineStageActor[TIn, TOut]) processNextElement(ctx context.Context, logger klog.Logger, metrics *statsd.Client) bool {
element, shutdown := a.queue.Get()
logger.V(0).Info("Starting processing element", "element", element)
logger.V(0).Info("Starting processing element for stage", "element", element, "stage", a.stageName)
elementProcessStart := time.Now()

if shutdown {
Expand All @@ -67,6 +67,7 @@ func (a *DefaultPipelineStageActor[TIn, TOut]) processNextElement(ctx context.Co
defer telemetry.Gauge(metrics, fmt.Sprintf("%s_queue_size", a.stageName), float64(a.queue.Len()), a.stageTags, 1)

result, err := a.processor(element)
logger.V(0).Info("Finished processing element for stage", "element", element, "stage", a.stageName)
if err == nil {
// If no error occurs then we send the result to the receiver, if one is attached
if a.receiver != nil {
Expand All @@ -76,8 +77,8 @@ func (a *DefaultPipelineStageActor[TIn, TOut]) processNextElement(ctx context.Co
}
// there was a failure so be sure to report it. This method allows for
// pluggable error handling which can be used for things like cluster-monitoring.
utilruntime.HandleErrorWithContext(ctx, err, "Error processing element", "element", element)
logger.V(0).Error(err, "Starting processing element", "element", element)
utilruntime.HandleErrorWithContext(ctx, err, "Error processing element for stage", "element", element, "stage", a.stageName)
logger.V(0).Error(err, "Error when processing element for stage", "element", element, "stage", a.stageName)

// forget this submission to prevent clogging the queue
a.queue.Forget(element)
Expand All @@ -95,15 +96,15 @@ func (a *DefaultPipelineStageActor[TIn, TOut]) Start(ctx context.Context) {

logger := klog.FromContext(ctx)

logger.V(4).Info("Started workers")
logger.V(0).Info("Starting workers for stage", "stage", a.stageName)
for i := 0; i < a.workers; i++ {
go wait.UntilWithContext(ctx, a.runActor, time.Second)
}
logger.V(4).Info("Started actor workers")
logger.V(0).Info("Started workers for stage", "workers", a.workers, "stage", a.stageName)

<-ctx.Done()

logger.V(4).Info("Shutting down actor workers")
logger.V(0).Info("Shutting down workers for stage", "stage", a.stageName)
}

func (a *DefaultPipelineStageActor[TIn, TOut]) Receive(element TIn) {
Expand Down

0 comments on commit 21d33cf

Please sign in to comment.