From 7272b04db76507a98e07e96dc88e2c1a54283693 Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Tue, 23 May 2023 11:36:48 +0200 Subject: [PATCH] Protocol Specification for Row Commit Versions This PR adds the protocol specification changes for the Row Commit Versions that are proposed in https://github.com/delta-io/delta/issues/1715. In particular it makes the following changes: - Renames the rowIds feature to rowTracking. - Renames the delta.enableRowIds property to delta.enableRowTracking. - Renames and moves the preservedRowIds flag in rowIdHighWaterMark to delta.rowTracking.preserved in the tags of commitInfo. - Refactors the specification of Row IDs - Adds the specification for Row Commit Versions. Closes delta-io/delta#1747 GitOrigin-RevId: ac774c4b92c53643d9f4f5b174270a94ab71e1e1 --- PROTOCOL.md | 171 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 117 insertions(+), 54 deletions(-) diff --git a/PROTOCOL.md b/PROTOCOL.md index abfd4de21e0..1036ac42186 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -23,28 +23,32 @@ - [Transaction Identifiers](#transaction-identifiers) - [Protocol Evolution](#protocol-evolution) - [Commit Provenance Information](#commit-provenance-information) - - [Increase Row ID High-Water Mark](#increase-row-id-high-watermark) + - [Increase Row ID High-water Mark](#increase-row-id-high-water-mark) - [Domain Metadata](#domain-metadata) - [Reader Requirements for Domain Metadata](#reader-requirements-for-domain-metadata) - [Writer Requirements for Domain Metadata](#writer-requirements-for-domain-metadata) - [Action Reconciliation](#action-reconciliation) - [Table Features](#table-features) - - [Table Features for new and Existing Tables](#table-features-for-new-and-existing-tables) + - [Table Features for New and Existing Tables](#table-features-for-new-and-existing-tables) - [Enabled Features](#enabled-features) - [Disabled Features](#disabled-features) - [Column Mapping](#column-mapping) - [Writer Requirements for Column Mapping](#writer-requirements-for-column-mapping) - [Reader Requirements for Column Mapping](#reader-requirements-for-column-mapping) - [Deletion Vectors](#deletion-vectors) +- [Timestamp without timezone (TimestampNTZ)](#timestamp-without-timezone-timestampntz) - [Deletion Vector Descriptor Schema](#deletion-vector-descriptor-schema) - [Derived Fields](#derived-fields) - [JSON Example 1 — On Disk with Relative Path (with Random Prefix)](#json-example-1--on-disk-with-relative-path-with-random-prefix) - [JSON Example 2 — On Disk with Absolute Path](#json-example-2--on-disk-with-absolute-path) - [JSON Example 3 — Inline](#json-example-3--inline) - [Reader Requirements for Deletion Vectors](#reader-requirements-for-deletion-vectors) -- [Row IDs](#row-ids) - - [Reader Requirements for Row IDs](#reader-requirements-for-row-ids) - - [Writer Requirements for Row IDs](#writer-requirements-for-row-ids) + - [Writer Requirement for Deletion Vectors](#writer-requirement-for-deletion-vectors) +- [Row Tracking](#row-tracking) + - [Row IDs](#row-ids) + - [Row Commit Versions](#row-commit-versions) + - [Reader Requirements for Row Tracking](#reader-requirements-for-row-tracking) + - [Writer Requirements for Row Tracking](#writer-requirements-for-row-tracking) - [Requirements for Writers](#requirements-for-writers) - [Creation of New Log Entries](#creation-of-new-log-entries) - [Consistency Between Table Metadata and Data Files](#consistency-between-table-metadata-and-data-files) @@ -350,6 +354,7 @@ stats | [Statistics Struct](#Per-file-Statistics) | Contains statistics (e.g., c tags | Map[String, String] | Map containing metadata about this logical file | optional deletionVector | [DeletionVectorDescriptor Struct](#Deletion-Vectors) | Either null (or absent in JSON) when no DV is associated with this data file, or a struct (described below) that contains necessary information about the DV that is part of this logical file. | optional baseRowId | Long | Default generated Row ID of the first row in the file. The default generated Row IDs of the other rows in the file can be reconstructed by adding the physical index of the row within the file to the base Row ID. See also [Row IDs](#row-ids) | optional +defaultRowCommitVersion | Long | First commit version in which an `add` action with the same `path` was committed to the table. | optional The following is an example `add` action: ```json @@ -361,6 +366,7 @@ The following is an example `add` action: "modificationTime": 1512909768000, "dataChange": true, "baseRowId": 4071, + "defaultRowCommitVersion": 41, "stats": "{\"numRecords\":1,\"minValues\":{\"val..." } } @@ -379,6 +385,7 @@ size| Long | The size of this data file in bytes | optional tags | Map[String, String] | Map containing metadata about this file | optional deletionVector | [DeletionVectorDescriptor Struct](#Deletion-Vectors) | Either null (or absent in JSON) when no DV is associated with this data file, or a struct (described below) that contains necessary information about the DV that is part of this logical file. | optional baseRowId | Long | Default generated Row ID of the first row in the file. The default generated Row IDs of the other rows in the file can be reconstructed by adding the physical index of the row within the file to the base Row ID. See also [Row IDs](#row-ids) | optional +defaultRowCommitVersion | Long | First commit version in which an `add` action with the same `path` was committed to the table | optional The following is an example `remove` action. ```json @@ -387,6 +394,7 @@ The following is an example `remove` action. "path": "part-00001-9…..snappy.parquet", "deletionTimestamp": 1515488792485, "baseRowId": 4071, + "defaultRowCommitVersion": 41, "dataChange": true } } @@ -554,15 +562,13 @@ The schema of `rowIdHighWaterMark` action is as follows: Field Name | Data Type | Description | optional/required -|-|-|- -highWaterMark | Long | Highest Row ID that has been assigned to a row in the table. | required -preservedRowIds | Boolean | When true, the commit that wrote this high-water mark preserved existing Row IDs of rewritten rows. | required +highWaterMark | Long | The highest Row ID that has been assigned to a row in the table. | required The following is an example `rowIdHighWaterMark` action: ```json { "rowIdHighWaterMark": { - "highWaterMark": 1432, - "preservedRowIds": true + "highWaterMark": 1432 } } ``` @@ -792,7 +798,26 @@ If a snapshot contains logical files with records that are invalidated by a DV, ## Writer Requirement for Deletion Vectors When adding a logical file with a deletion vector, then that logical file must have correct `numRecords` information for the data file in the `stats` field. -# Row IDs +# Row Tracking + +Row Tracking is a feature that allows the tracking of rows across multiple versions of a Delta table. +It enables this by exposing two metadata columns: Row IDs, which uniquely identify a row across multiple versions of a table, +and Row Commit Versions, which make it possible to check whether two rows with the same ID in two different versions of the table represent the same version of the row. + +Row Tracking is defined to be **supported** or **enabled** on a table as follows: +- When the feature `rowTracking` exists in the table `protocol`'s `writerFeatures`, then we say that Row Tracking is **supported**. + In this situation, writers must assign Row IDs and Commit Versions, but they cannot yet be relied upon to be present in the table. + When Row Tracking is supported but not yet enabled writers cannot preserve Row IDs and Commit Versions. +- When additionally the table property `delta.enableRowTracking` is set to `true`, then we say that Row Tracking is **enabled**. + In this situation, Row IDs and Row Commit versions can be relied upon to be present in the table for all rows. + When Row Tracking is enabled writers are expected to preserve Row IDs and Commit Versions. + +Enablement: +- The table must be on Writer Version 7. +- The feature `rowTracking` must exist in the table `protocol`'s `writerFeatures`. +- The table property `delta.enableRowTracking` must be set to `true`. + +## Row IDs Delta provides Row IDs. Row IDs are integers that are used to uniquely identify rows within a table. Every row has two Row IDs: @@ -802,7 +827,9 @@ Every row has two Row IDs: The fresh ID of a row may change every time the table is updated, even for rows that are not modified. E.g. when a row is copied unchanged during an update operation, it will get a new fresh ID. Fresh IDs can be used to identify rows within one version of the table, e.g. for identifying matching rows in self joins. - A **stable Row ID**. This ID uniquely identifies the row across versions of the table and across updates. + When a row is inserted, it is assigned a new stable Row ID that is equal to the fresh Row ID. When a row is updated or copied, the stable Row ID for this row is preserved. + When a row is restored (i.e. the table is restored to an earlier version), its stable Row ID is restored as well. The fresh and stable Row IDs are not required to be equal. @@ -814,67 +841,99 @@ Row IDs are stored in two ways: - **Materialized Row IDs** are stored in a column in the data files. This column is hidden from readers and writers, i.e. it is not part of the `schemaString` in the table's `metaData`. - Instead, the name of this column can be found in the value for the `delta.rowIds.physicalColumnName` key in the `configuration` of the table's `metaData` action. + Instead, the name of this column can be found in the value for the `delta.rowTracking.materializedRowIdColumnName` key in the `configuration` of the table's `metaData` action. This column may contain `null` values meaning that the corresponding row has no materialized Row ID. This column may be omitted if all its values are `null` in the file. Materialized Row IDs provide a mechanism for writers to preserve stable Row IDs for rows that are updated or copied. The fresh Row ID of a row is equal to the default generated Row ID. The stable Row ID of a row is equal to the materialized Row ID of the row when that column is present and the value is not NULL, otherwise it is equal to the default generated Row ID. -Row IDs are defined to be **supported** or **enabled** on a table as follows: -- When the feature `delta.rowIds` exists in the table `protocol`'s `writerFeatures` but the table property `delta.enableRowIds` is not set to true, then we say that row IDs are **supported**. - In this situation, writers must assign new fresh row IDs, but row IDs cannot yet be relied upon to be present in the table. -- When the feature `delta.rowIds` exists in the table `protocol`'s `writerFeatures` and the table property `delta.enableRowIds` is set to true, then we say that row IDs are **enabled**. - In this situation, row IDs can be relied upon to be present in the table for all rows. +When Row Tracking is enabled: +- Default generated Row IDs must be assigned to all existing rows. + This means in particular that all files that are part of the table version that sets the table property `delta.enableRowTracking` to `true` must have `baseRowId` set. + A backfill operation may be required to commit `add` and `remove` actions with the `baseRowId` field set for all data files before the table property `delta.enableRowTracking` can be set to `true`. -Enablement: -- The table must be on Writer Version 7. -- The feature `delta.rowIds` must exist in the table `protocol`'s `writerFeatures`. -- The table property `delta.enableRowIds` must be set to `true`. +## Row Commit Versions -When enabled: -- Default generated Row IDs must be assigned to all existing rows. - This means in particular that all files that are part of the table version that sets the table property `delta.enableRowIds` to `true` must have `baseRowId` set. - A backfill operation may be required to commit `add` and `remove` actions with the `baseRowId` field set for all data files before the table property `delta.enableRowIds` can be set to `true`. +Row Commit Versions provide versioning of rows. -## Reader Requirements for Row IDs +- **Fresh** or unstable **Row Commit Versions** can be used to identify the first commit version in which the `add` action containing the row was committed. + The fresh Commit Version of a row may change every time the table is updated, even for rows that are not modified. E.g. when a row is copied unchanged during an update operation, it will get a new fresh Commit Version. +- **Stable Row Commit Versions** identify the last commit version in which the row (with the same ID) was either inserted or updated. + When a row is inserted or updated, it is assigned the commit version number of the log entry containing the `add` entry with the new row. + When a row is copied, the stable Row Commit Version for this row is preserved. + When a row is restored (i.e. the table is restored to an earlier version), its stable Row Commit Version is restored as well. -When Row IDs are enabled (when the table property `delta.enableRowIds` is set to `true`) and Row IDs are to be read, then: -- When requested, readers must reconstruct stable Row IDs as follows: - 1. Readers must use the materialized Row ID if the physical Row ID column determined by `delta.rowIds.physicalColumnName` is present in the data file and the column contains a non `null` value for a row. - 2. Readers must use the default generated Row ID in all other cases. -- Readers cannot read Row IDs while reading change data files from `cdc` actions. +The fresh and stable Row Commit Versions are not required to be equal. -## Writer Requirements for Row IDs +Commit Versions are stored in two ways: -When Row IDs are supported (when the `writerFeatures` field of a table's `protocol` action contains `rowIds`), then: -- Writers must assign unique fresh Row IDs to all rows that they commit, i.e. writers must set the `baseRowId` field in all `add` actions that they commit so that all default generated Row IDs are unique in the table version. - Writers must never commit duplicate Row IDs in the table in any version. -- Writers must set the `baseRowId` field in `remove` actions to the `baseRowId` value (if present) of the last committed `add` action with the same `path`. -- Writers must include a `rowIdHighWaterMark` action whenever they assign new fresh Row IDs, i.e. whenever they commit an `add` action for a new physical file. - The `highWaterMark` value of the `rowIdHighWaterMark` action must always be equal to or greater than the highest fresh Row ID committed so far. - Writers are allowed to reserve Row IDs in a first commit and assign them in following commits. In that case, each commit must include a `rowIdHighWaterMark` action even when the `highWaterMark` value is not changing. +- **Default generated Row Commit Versions** use the `defaultRowCommitVersion` field in `add` and `remove` actions. + Default generated Row Commit Versions require little storage overhead but are reassigned every time a row is updated or moved to a different file (for instance when a row is contained in a file that is compacted by OPTIMIZE). -Writers can enable Row IDs by setting `delta.enableRowIds` to `true` in the `configuration` of the table's `metaData`. +- **Materialized Row Commit Versions** are stored in a column in the data files. + This column is hidden from readers and writers, i.e. it is not part of the `schemaString` in the table's `metaData`. + Instead, the name of this column can be found in the value for the `delta.rowTracking.materializedRowCommitVersionColumnName` key in the `configuration` of the table's `metaData` action. + This column may contain `null` values meaning that the corresponding row has no materialized Row Commit Version. This column may be omitted if all its values are `null` in the file. + Materialized Row Commit Versions provide a mechanism for writers to preserve Row Commit Versions for rows that are copied. + +The fresh Row Commit Version of a row is equal to the default generated Row Commit version. +The stable Row Commit Version of a row is equal to the materialized Row Commit Version of the row when that column is present and the value is not NULL, otherwise it is equal to the default generated Commit Version. + +## Reader Requirements for Row Tracking + +When Row Tracking is enabled (when the table property `delta.enableRowTracking` is set to `true`), then: +- When Row IDs are requested, readers must reconstruct stable Row IDs as follows: + 1. Readers must use the materialized Row ID if the column determined by `delta.rowTracking.materializedRowIdColumnName` is present in the data file and the column contains a non `null` value for a row. + 2. Otherwise, readers must use the default generated Row ID of the `add` or `remove` action containing the row in all other cases. + I.e. readers must add the index of the row in the file to the `baseRowId` of the `add` or `remove` action for the file containing the row. +- When Row Commit Versions are requested, readers must reconstruct them as follows: + 1. Readers must use the materialized Row Commit Versions if the column determined by `delta.rowTracking.materializedRowCommitVersionColumnName is present in the data file and the column contains a non `null` value for a row. + 2. Otherwise, Readers must use the default generated Row Commit Versions of the `add` or `remove` action containing the row in all other cases. + I.e. readers must use the `defaultRowCommitVersion` of the `add` or `remove` action for the file containing the row. +- Readers cannot read Row IDs and Row Commit Versions while reading change data files from `cdc` actions. + +## Writer Requirements for Row Tracking + +When Row Tracking is supported (when the `writerFeatures` field of a table's `protocol` action contains `rowTracking`), then: +- Writers must assign unique fresh Row IDs to all rows that they commit. + - Writers must set the `baseRowId` field in all `add` actions that they commit so that all default generated Row IDs are unique in the table version. + Writers must never commit duplicate Row IDs in the table in any version. + - Writers must set the `baseRowId` field in recommitted and checkpointed `add` actions and `remove` actions to the `baseRowId` value (if present) of the last committed `add` action with the same `path`. + - Writers must set the `baseRowId` field to a value that is higher than the `rowIdHighWatermark`. + - Writers must include a `rowIdHighWaterMark` action whenever they assign new fresh Row IDs that are higher than `highWaterMark` value of the current `rowIdHighWaterMark` action. + The `highWaterMark` value of the `rowIdHighWaterMark` action must always be equal to or greater than the highest fresh Row ID committed so far. + Writers can either commit the `rowIdHighWaterMark` in the same commit, or they can reserve the fresh Row IDs in an earlier commit. +- Writer must assign fresh Row Commit Versions to all rows that they commit. + - Writers must set the `defaultRowCommitVersion` field in new `add` actions to the version number of the log enty containing the `add` action. + - Writers must set the `defaultRowCommitVersion` field in recommitted and checkpointed `add` actions and `remove` actions to the `defaultRowCommitVersion` of the last committed `add` action with the same `path`. + +Writers can enable Row Tracking by setting `delta.enableRowTracking` to `true` in the `configuration` of the table's `metaData`. This is only allowed if the following requirements are satisfied: -- The feature `rowIds` has been added to the `writerFeatures` field of a table's `protocol` action either in the same version of the table or in an earlier version of the table. -- A physical column name for the materialized Row IDs has been assigned and added to the `configuration` in the table's `metaData` action using the key `delta.rowIds.physicalColumnName`. - - The assigned column name must be unique. It must not be equal to the name of any other column in the table's schema. - The assigned column name must remain unique in all future versions of the table. +- The feature `rowTracking` has been added to the `writerFeatures` field of a table's `protocol` action either in the same version of the table or in an earlier version of the table. +- The column name for the materialized Row IDs and Row Commit Versions have been assigned and added to the `configuration` in the table's `metaData` action using the keys `delta.rowTracking.materializedRowIdColumnName` and `delta.rowTracking.materializedRowCommitVersionColumnName` respectively. + - The assigned column names must be unique. They must not be equal to the name of any other column in the table's schema. + The assigned column names must remain unique in all future versions of the table. If [Column Mapping](#column-mapping) is enabled, then the assigned column name must be distinct from the physical column names of the table. -- The `baseRowId` field is set for all active `add` actions in the version of the table in which `delta.enableRowIds` is set to `true`. -- If the `baseRowId` field is not set in some active `add` action in the table, then writers must first commit new `add` actions that set the `baseRowId` field to replace the `add` actions that do not have the `baseRowId` field set. - This can be done in the commit that sets `delta.enableRowIds` to `true` or in an earlier commit. - The assigned `baseRowId` values must satisfy the same requirements as when assigning fresh Row IDs. +- The `baseRowId` and `defaultRowCommitVersion` fields are set for all active `add` actions in the version of the table in which `delta.enableRowTracking` is set to `true`. +- If the `baseRowId` and `defaultRowCommitVersion` fields are not set in some active `add` action in the table, then writers must first commit new `add` actions that set these fields to replace the `add` actions that do not have these fields set. + This can be done in the commit that sets `delta.enableRowTracking` to `true` or in an earlier commit. + The assigned `baseRowId` and `defaultRowCommitVersion` values must satisfy the same requirements as when assigning fresh Row IDs and fresh Row Commit Versions respectively. -When Row IDs are enabled (when the table property `delta.enableRowIds` is set to `true`), then: -- Writers should assign stable Row IDs to all rows. +When Row Tracking is enabled (when the table property `delta.enableRowTracking` is set to `true`), then: +- Writers must assign stable Row IDs to all rows. - Stable Row IDs must be unique within a version of the table and must not be equal to the fresh Row IDs of other rows in the same version of the table. - Writers should preserve the stable Row IDs of rows that are updated or copied using materialized Row IDs. - The preserved stable Row ID (i.e. a stable Row ID that is not equal to the fresh Row ID of the same physical row) should be equal to the stable Row ID of the same logical row before it was updated or copied. - - Materialized Row IDs must be written to the physical column determined by `delta.rowIds.physicalColumnName` in the `configuration` of the table's `metaData` action. - The value in this physical column must be set to `NULL` for stable Row IDs that are not preserved. -- Writers must set the `preservedRowIds` flag of the `rowIdHighWaterMark` action to `true` whenever all the stable Row IDs of rows that are updated or copied were preserved. - Writers must set that flag to false otherwise. In particular, writers must set the `preservedRowIds` flag of the `rowIdHighWaterMark` action to `true` if no rows are updated or copied. + - Materialized Row IDs must be written to the column determined by `delta.rowTracking.materializedRowIdColumnName` in the `configuration` of the table's `metaData` action. + The value in this column must be set to `NULL` for stable Row IDs that are not preserved. +- Writers must assign stable Row Commit Versions to all rows. + - Writers should preserve the stable Row Commit Versions of rows that are copied (but not updated) using materialized Row Commit Versions. + - The preserved stable Row Commit Version (i.e. a stable Row Commit Version that is not equal to the fresh Row Commit Version of the same physical row) should be equal to the stable Commit Version of the same logical row before it was copied. + - Materialized Row Commit Versions must be written to the column determined by `delta.rowTracking.materializedRowCommitVersionColumnName` in the `configuration` of the table's `metaData` action. + The value in this column must be set to `NULL` for stable Row Commit Versions that are not preserved (i.e. that are equal to the fresh Row Commit Version). +- Writers should set `delta.rowTracking.preserved` in the `tags` of the `commitInfo` action to `true` whenever all the stable Row IDs of rows that are updated or copied and all the stable Row Commit Versions of rows that are copied were preserved. + In particular, writers should set `delta.rowTracking.preserved` in the `tags` of the `commitInfo` action to `true` if no rows are updated or copied. + Writers should set that flag to false otherwise. # Requirements for Writers This section documents additional requirements that writers must follow in order to preserve some of the higher level guarantees that Delta provides. @@ -1062,7 +1121,7 @@ Feature | Name | Readers or Writers? [Column Mapping](#column-mapping) | `columnMapping` | Readers and writers [Identity Columns](#identity-columns) | `identityColumns` | Writers only [Deletion Vectors](#deletion-vectors) | `deletionVectors` | Readers and writers -[Row IDs](#row-ids) | `rowIds` | Writers only +[Row Tracking](#row-tracking) | `rowTracking` | Writers only [Timestamp without Timezone](#timestamp-ntz) | `timestampNTZ` | Readers and writers [Domain Metadata](#domain-metadata) | `domainMetadata` | Writers only @@ -1371,6 +1430,8 @@ The following examples uses a table with two partition columns: "date" and "regi | |-- dataChange: boolean | |-- stats: string | |-- tags: map +| |-- baseRowId: long +| |-- defaultRowCommitVersion: long | |-- partitionValues_parsed: struct | | |-- date: date | | |-- region: string @@ -1445,6 +1506,8 @@ Checkpoint schema (just the `add` column): | |-- dataChange: boolean | |-- stats: string | |-- tags: map +| |-- baseRowId: long +| |-- defaultRowCommitVersion: long | |-- partitionValues_parsed: struct | | |-- col-798f4abc-c63f-444c-9a04-e2cf1ecba115: date | | |-- col-19034dc3-8e3d-4156-82fc-8e05533c088e: string