From d7f69f7d415f5a83230ac44e340830ebabf46d4a Mon Sep 17 00:00:00 2001 From: Jinghan Ying Date: Fri, 14 Jan 2022 09:29:57 +0800 Subject: [PATCH] feat(pkg/oomstore): implement new API Export --- pkg/oomstore/export.go | 74 +++++++++++++++++++++++++++++++++-- pkg/oomstore/types/options.go | 6 +++ 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/pkg/oomstore/export.go b/pkg/oomstore/export.go index 1532b78ba..7f9a251ce 100644 --- a/pkg/oomstore/export.go +++ b/pkg/oomstore/export.go @@ -105,19 +105,19 @@ func (s *OomStore) ChannelExportStream(ctx context.Context, opt types.ChannelExp FeatureFullNames: &opt.FeatureFullNames, }) if err != nil { - return nil, errdefs.WithStack(err) + return nil, err } if len(features.GroupIDs()) != 1 { - return nil, fmt.Errorf("expected 1 group, got %d groups", len(features.GroupIDs())) + return nil, errdefs.Errorf("expected 1 group, got %d groups", len(features.GroupIDs())) } group := features[0].Group revisions, err := s.ListRevision(ctx, &group.ID) if err != nil { - return nil, errdefs.WithStack(err) + return nil, err } revision := revisions.Before(opt.UnixMilli) if revision == nil { - return nil, fmt.Errorf("no feature values up to %d, use a later timestamp", opt.UnixMilli) + return nil, errdefs.Errorf("no feature values up to %d, use a later timestamp", opt.UnixMilli) } if revision.SnapshotTable == "" { if err = s.Snapshot(ctx, group.Name); err != nil { @@ -139,3 +139,69 @@ func (s *OomStore) ChannelExportStream(ctx context.Context, opt types.ChannelExp header := append([]string{group.Entity.Name}, features.Names()...) return types.NewExportResult(header, stream, exportErr), nil } + +// ChannelExport exports the latest streaming feature values up to the given timestamp. +// Currently, this API can only export features in one feature group. +func (s *OomStore) ChannelExport(ctx context.Context, opt types.ChannelExportOpt) (*types.ExportResult, error) { + if err := validateFeatureFullNames(opt.FeatureFullNames); err != nil { + return nil, err + } + features, err := s.ListFeature(ctx, types.ListFeatureOpt{ + FeatureFullNames: &opt.FeatureFullNames, + }) + if err != nil { + return nil, err + } + + featureMap := buildGroupIDToFeaturesMap(features) + snapshotTables := make(map[int]string) + cdcTables := make(map[int]string) + for _, featureList := range featureMap { + group := featureList[0].Group + revisions, err := s.ListRevision(ctx, &group.ID) + if err != nil { + return nil, err + } + revision := revisions.Before(opt.UnixMilli) + if revision == nil { + return nil, errdefs.Errorf("no feature values up to %d, use a later timestamp", opt.UnixMilli) + } + if revision.SnapshotTable == "" { + if err = s.Snapshot(ctx, group.Name); err != nil { + return nil, err + } + } + revision, err = s.GetRevision(ctx, revision.ID) + if err != nil { + return nil, err + } + snapshotTables[group.ID] = revision.SnapshotTable + if group.Category == types.CategoryStream { + cdcTables[group.ID] = revision.CdcTable + } + } + + stream, exportErr := s.offline.Export(ctx, offline.ExportOpt{ + SnapshotTables: snapshotTables, + CdcTables: cdcTables, + Features: featureMap, + UnixMilli: opt.UnixMilli, + EntityName: features[0].Group.Entity.Name, + Limit: opt.Limit, + }) + + header := append([]string{features[0].Group.Entity.Name}, features.Names()...) + return types.NewExportResult(header, stream, exportErr), nil +} + +// key: group_Id, value: slice of features +func buildGroupIDToFeaturesMap(features types.FeatureList) map[int]types.FeatureList { + groups := make(map[int]types.FeatureList) + for _, f := range features { + if _, ok := groups[f.Group.ID]; !ok { + groups[f.Group.ID] = types.FeatureList{} + } + groups[f.Group.ID] = append(groups[f.Group.ID], f) + } + return groups +} diff --git a/pkg/oomstore/types/options.go b/pkg/oomstore/types/options.go index c2974a611..ad0488821 100644 --- a/pkg/oomstore/types/options.go +++ b/pkg/oomstore/types/options.go @@ -30,6 +30,12 @@ type CreateGroupOpt struct { Description string } +type ChannelExportOpt struct { + FeatureFullNames []string + UnixMilli int64 + Limit *uint64 +} + type ChannelExportBatchOpt struct { RevisionID int FeatureNames []string