Skip to content

Commit

Permalink
Merge pull request #1070 from oom-ai/feat/udpate-revision
Browse files Browse the repository at this point in the history
Feat/update revision
  • Loading branch information
lianxmfor authored Jan 24, 2022
2 parents 4a3a7c3 + 12aa995 commit 58d0eeb
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
3 changes: 3 additions & 0 deletions internal/database/metadata/sqlutil/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func UpdateRevision(ctx context.Context, sqlxCtx metadata.SqlxContext, opt metad
if opt.NewSnapshotTable != nil {
and["snapshot_table"] = *opt.NewSnapshotTable
}
if opt.NewCdcTable != nil {
and["cdc_table"] = *opt.NewCdcTable
}
cond, args, err := dbutil.BuildConditions(and, nil)
if err != nil {
return err
Expand Down
24 changes: 22 additions & 2 deletions internal/database/metadata/test_impl/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestUpdateRevision(t *testing.T, prepareStore PrepareStoreFn, destroyStore
revisionID, _, err := store.CreateRevision(ctx, metadata.CreateRevisionOpt{
Revision: 1000,
GroupID: groupID,
SnapshotTable: stringPtr("device_info_1000"),
SnapshotTable: stringPtr("offline_stream_snapshot_device_info_1000"),
CdcTable: stringPtr("offline_stream_cdc_device_info_1000"),
Anchored: false,
})
require.NoError(t, err)
Expand All @@ -129,7 +130,8 @@ func TestUpdateRevision(t *testing.T, prepareStore PrepareStoreFn, destroyStore
ID: revisionID,
GroupID: groupID,
Revision: 1000,
SnapshotTable: "device_info_1000",
SnapshotTable: "offline_stream_snapshot_device_info_1000",
CdcTable: "offline_stream_cdc_device_info_1000",
Anchored: true,
Group: group,
},
Expand All @@ -146,6 +148,24 @@ func TestUpdateRevision(t *testing.T, prepareStore PrepareStoreFn, destroyStore
GroupID: groupID,
Revision: 1000,
SnapshotTable: "new_table",
CdcTable: "offline_stream_cdc_device_info_1000",
Anchored: true,
Group: group,
},
},
{
description: "update revision cdc_table successfully",
opt: metadata.UpdateRevisionOpt{
RevisionID: revisionID,
NewCdcTable: stringPtr("new_table"),
},
expected: nil,
expectedRevision: &types.Revision{
ID: revisionID,
GroupID: groupID,
Revision: 1000,
SnapshotTable: "new_table",
CdcTable: "new_table",
Anchored: true,
Group: group,
},
Expand Down
1 change: 1 addition & 0 deletions internal/database/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type UpdateRevisionOpt struct {
NewRevision *int64
NewAnchored *bool
NewSnapshotTable *string
NewCdcTable *string
}

type ListFeatureOpt struct {
Expand Down
20 changes: 10 additions & 10 deletions pkg/oomstore/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,26 @@ func (s *OomStore) createSnapshotAndCdcTable(ctx context.Context, revision *type
return err
}

// Update snapshot_table in feature_group_revision table
if err := s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{
RevisionID: revision.ID,
NewSnapshotTable: &snapshotTable,
}); err != nil {
return err
}

var cdcTable *string
if revision.Group.Category == types.CategoryStream {
tableName := dbutil.OfflineStreamCdcTableName(revision.GroupID, revision.Revision)
if err = s.offline.CreateTable(ctx, offline.CreateTableOpt{
TableName: dbutil.OfflineStreamCdcTableName(revision.GroupID, revision.Revision),
TableName: tableName,
Entity: revision.Group.Entity,
Features: features,
TableType: types.TableStreamCdc,
}); err != nil {
return err
}
cdcTable = &tableName
}

return nil
// Update snapshot_table in feature_group_revision table
return s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{
RevisionID: revision.ID,
NewSnapshotTable: &snapshotTable,
NewCdcTable: cdcTable,
})
}

func (s *OomStore) createFirstSnapshotTable(ctx context.Context, revision *types.Revision) error {
Expand Down

0 comments on commit 58d0eeb

Please sign in to comment.