Skip to content

Commit

Permalink
[AWS] Check compatibility version of the DB schema on startup (#459)
Browse files Browse the repository at this point in the history
The version is written at schema creation, and checked on startup. If they are different, it fails. Towards #450.
  • Loading branch information
mhutchinson authored Jan 23, 2025
1 parent cc87631 commit c78d54b
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion storage/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ const (

DefaultPushbackMaxOutstanding = 4096
DefaultIntegrationSizeLimit = 5 * 4096

// SchemaCompatibilityVersion represents the expected version (e.g. layout & serialisation) of stored data.
//
// A binary built with a given version of the Tessera library is compatible with stored data created by a different version
// of the library if and only if this value is the same as the compatibilityVersion stored in the Tessera table.
//
// NOTE: if changing this version, you need to consider whether end-users are going to update their schema instances to be
// compatible with the new format, and provide a means to do it if so.
SchemaCompatibilityVersion = 1
)

// Storage is an AWS based storage implementation for Tessera.
Expand Down Expand Up @@ -564,14 +573,35 @@ func newMySQLSequencer(ctx context.Context, dsn string, maxOutstanding uint64, m
if err := r.initDB(ctx); err != nil {
return nil, fmt.Errorf("failed to initDB: %v", err)
}
if err := r.checkDataCompatibility(ctx); err != nil {
return nil, fmt.Errorf("schema is not compatible with this version of the Tessera library: %v", err)
}
return r, nil
}

// checkDataCompatibility compares the Tessera library SchemaCompatibilityVersion with the one stored in the
// database, and returns an error if they are not identical.
func (s *mySQLSequencer) checkDataCompatibility(ctx context.Context) error {
row := s.dbPool.QueryRowContext(ctx, "SELECT compatibilityVersion FROM Tessera WHERE id = 0")
var gotVersion uint64
if err := row.Scan(&gotVersion); err != nil {
return fmt.Errorf("failed to read schema compatibility version from DB: %v", err)
}

if gotVersion != SchemaCompatibilityVersion {
return fmt.Errorf("schema compatibilityVersion (%d) != library compatibilityVersion (%d)", gotVersion, SchemaCompatibilityVersion)
}
return nil
}

// initDB ensures that the coordination DB is initialised correctly.
//
// It creates tables if they don't exist already, and inserts zero values.
//
// The database schema consists of 3 tables:
// The database schema consists of 4 tables:
// - Tessera
// This table only ever contains a single row which tracks the compatibility
// version of the DB schema and data formats.
// - SeqCoord
// This table only ever contains a single row which tracks the next available
// sequence number.
Expand All @@ -583,6 +613,14 @@ func newMySQLSequencer(ctx context.Context, dsn string, maxOutstanding uint64, m
// This table coordinates integration of the batches of entries stored in
// Seq into the committed tree state.
func (s *mySQLSequencer) initDB(ctx context.Context) error {
if _, err := s.dbPool.ExecContext(ctx,
`CREATE TABLE IF NOT EXISTS Tessera (
id INT UNSIGNED NOT NULL,
compatibilityVersion BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (id)
)`); err != nil {
return err
}
if _, err := s.dbPool.ExecContext(ctx,
`CREATE TABLE IF NOT EXISTS SeqCoord(
id INT UNSIGNED NOT NULL,
Expand Down Expand Up @@ -616,6 +654,10 @@ func (s *mySQLSequencer) initDB(ctx context.Context) error {
// sequencing and integration to occur.
// Note that this will only succeed if no row exists, so there's no danger
// of "resetting" an existing log.
if _, err := s.dbPool.ExecContext(ctx,
`INSERT IGNORE INTO Tessera (id, compatibilityVersion) VALUES (0, ?)`, SchemaCompatibilityVersion); err != nil {
return err
}
if _, err := s.dbPool.ExecContext(ctx,
`INSERT IGNORE INTO SeqCoord (id, next) VALUES (0, 0)`); err != nil {
return err
Expand Down

0 comments on commit c78d54b

Please sign in to comment.