Skip to content

Commit

Permalink
feat(pkg/oomstore): implement new API Export
Browse files Browse the repository at this point in the history
  • Loading branch information
jinghancc committed Jan 14, 2022
1 parent 3ce0794 commit d7f69f7
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 4 deletions.
74 changes: 70 additions & 4 deletions pkg/oomstore/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,19 @@ func (s *OomStore) ChannelExportStream(ctx context.Context, opt types.ChannelExp
FeatureFullNames: &opt.FeatureFullNames,
})
if err != nil {
return nil, errdefs.WithStack(err)
return nil, err
}
if len(features.GroupIDs()) != 1 {
return nil, fmt.Errorf("expected 1 group, got %d groups", len(features.GroupIDs()))
return nil, errdefs.Errorf("expected 1 group, got %d groups", len(features.GroupIDs()))
}
group := features[0].Group
revisions, err := s.ListRevision(ctx, &group.ID)
if err != nil {
return nil, errdefs.WithStack(err)
return nil, err
}
revision := revisions.Before(opt.UnixMilli)
if revision == nil {
return nil, fmt.Errorf("no feature values up to %d, use a later timestamp", opt.UnixMilli)
return nil, errdefs.Errorf("no feature values up to %d, use a later timestamp", opt.UnixMilli)
}
if revision.SnapshotTable == "" {
if err = s.Snapshot(ctx, group.Name); err != nil {
Expand All @@ -139,3 +139,69 @@ func (s *OomStore) ChannelExportStream(ctx context.Context, opt types.ChannelExp
header := append([]string{group.Entity.Name}, features.Names()...)
return types.NewExportResult(header, stream, exportErr), nil
}

// ChannelExport exports the latest streaming feature values up to the given timestamp.
// Currently, this API can only export features in one feature group.
func (s *OomStore) ChannelExport(ctx context.Context, opt types.ChannelExportOpt) (*types.ExportResult, error) {
if err := validateFeatureFullNames(opt.FeatureFullNames); err != nil {
return nil, err
}
features, err := s.ListFeature(ctx, types.ListFeatureOpt{
FeatureFullNames: &opt.FeatureFullNames,
})
if err != nil {
return nil, err
}

featureMap := buildGroupIDToFeaturesMap(features)
snapshotTables := make(map[int]string)
cdcTables := make(map[int]string)
for _, featureList := range featureMap {
group := featureList[0].Group
revisions, err := s.ListRevision(ctx, &group.ID)
if err != nil {
return nil, err
}
revision := revisions.Before(opt.UnixMilli)
if revision == nil {
return nil, errdefs.Errorf("no feature values up to %d, use a later timestamp", opt.UnixMilli)
}
if revision.SnapshotTable == "" {
if err = s.Snapshot(ctx, group.Name); err != nil {
return nil, err
}
}
revision, err = s.GetRevision(ctx, revision.ID)
if err != nil {
return nil, err
}
snapshotTables[group.ID] = revision.SnapshotTable
if group.Category == types.CategoryStream {
cdcTables[group.ID] = revision.CdcTable
}
}

stream, exportErr := s.offline.Export(ctx, offline.ExportOpt{
SnapshotTables: snapshotTables,
CdcTables: cdcTables,
Features: featureMap,
UnixMilli: opt.UnixMilli,
EntityName: features[0].Group.Entity.Name,
Limit: opt.Limit,
})

header := append([]string{features[0].Group.Entity.Name}, features.Names()...)
return types.NewExportResult(header, stream, exportErr), nil
}

// key: group_Id, value: slice of features
func buildGroupIDToFeaturesMap(features types.FeatureList) map[int]types.FeatureList {
groups := make(map[int]types.FeatureList)
for _, f := range features {
if _, ok := groups[f.Group.ID]; !ok {
groups[f.Group.ID] = types.FeatureList{}
}
groups[f.Group.ID] = append(groups[f.Group.ID], f)
}
return groups
}
6 changes: 6 additions & 0 deletions pkg/oomstore/types/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type CreateGroupOpt struct {
Description string
}

type ChannelExportOpt struct {
FeatureFullNames []string
UnixMilli int64
Limit *uint64
}

type ChannelExportBatchOpt struct {
RevisionID int
FeatureNames []string
Expand Down

0 comments on commit d7f69f7

Please sign in to comment.