Skip to content

Commit

Permalink
Automatically update the REVISIONS_HEADS_MAP in db.py (apache#33616)
Browse files Browse the repository at this point in the history
* Automatically update the REVISIONS_HEADS_MAP in db.py

This has been the responsibility of the release manager and manually done.
In 2.7.0, I mistakenly added a wrong value for the revision which means that
anyone upgrading with `airflow db migrate --version 2.7.0` will end up having incorrect
migration.
This PR automates this and always updates it to the correct value through pre-commit

* fixup! Automatically update the REVISIONS_HEADS_MAP in db.py

* Account for missing versions in revision heads map

* Provide the full path of the DB file in message

* Make REVISION_HEADS_MAP private

* Only add from 2.0.0 up
  • Loading branch information
ephraimbuddy authored Aug 22, 2023
1 parent 8ed38c1 commit 513c1d2
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ repos:
language: python
entry: ./scripts/ci/pre_commit/pre_commit_version_heads_map.py
pass_filenames: false
additional_dependencies: ['packaging']
additional_dependencies: ['packaging','google-re2']
- id: update-version
name: Update version to the latest version in the documentation
entry: ./scripts/ci/pre_commit/pre_commit_update_versions.py
Expand Down
31 changes: 26 additions & 5 deletions airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.utils import cli as cli_utils, db
from airflow.utils.db import REVISION_HEADS_MAP
from airflow.utils.db import _REVISION_HEADS_MAP
from airflow.utils.db_cleanup import config_dict, drop_archived_tables, export_archived_records, run_cleanup
from airflow.utils.process_utils import execute_interactive
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
Expand Down Expand Up @@ -65,6 +65,27 @@ def upgradedb(args):
migratedb(args)


def get_version_revision(version: str, recursion_limit=10) -> str | None:
"""
Recursively search for the revision of the given version.
This searches REVISION_HEADS_MAP for the revision of the given version, recursively
searching for the previous version if the given version is not found.
"""
if version in _REVISION_HEADS_MAP:
return _REVISION_HEADS_MAP[version]
try:
major, minor, patch = map(int, version.split("."))
except ValueError:
return None
new_version = f"{major}.{minor}.{patch - 1}"
recursion_limit -= 1
if recursion_limit <= 0:
# Prevent infinite recursion as I can't imagine 10 successive versions without migration
return None
return get_version_revision(new_version, recursion_limit)


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def migratedb(args):
Expand All @@ -85,12 +106,12 @@ def migratedb(args):
elif args.from_version:
if parse_version(args.from_version) < parse_version("2.0.0"):
raise SystemExit("--from-version must be greater or equal to than 2.0.0")
from_revision = REVISION_HEADS_MAP.get(args.from_version)
from_revision = get_version_revision(args.from_version)
if not from_revision:
raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.")

if args.to_version:
to_revision = REVISION_HEADS_MAP.get(args.to_version)
to_revision = get_version_revision(args.to_version)
if not to_revision:
raise SystemExit(f"Upgrading to version {args.to_version} is not supported.")
elif args.to_revision:
Expand Down Expand Up @@ -129,11 +150,11 @@ def downgrade(args):
if args.from_revision:
from_revision = args.from_revision
elif args.from_version:
from_revision = REVISION_HEADS_MAP.get(args.from_version)
from_revision = get_version_revision(args.from_version)
if not from_revision:
raise SystemExit(f"Unknown version {args.from_version!r} supplied as `--from-version`.")
if args.to_version:
to_revision = REVISION_HEADS_MAP.get(args.to_version)
to_revision = get_version_revision(args.to_version)
if not to_revision:
raise SystemExit(f"Downgrading to version {args.to_version} is not supported.")
elif args.to_revision:
Expand Down
16 changes: 2 additions & 14 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,39 +67,27 @@

log = logging.getLogger(__name__)

REVISION_HEADS_MAP = {
_REVISION_HEADS_MAP = {
"2.0.0": "e959f08ac86c",
"2.0.1": "82b7c48c147f",
"2.0.2": "2e42bb497a22",
"2.1.0": "a13f7613ad25",
"2.1.1": "a13f7613ad25",
"2.1.2": "a13f7613ad25",
"2.1.3": "97cdd93827b8",
"2.1.4": "ccde3e26fe78",
"2.2.0": "7b2661a43ba3",
"2.2.1": "7b2661a43ba3",
"2.2.2": "7b2661a43ba3",
"2.2.3": "be2bfac3da23",
"2.2.4": "587bdf053233",
"2.2.5": "587bdf053233",
"2.3.0": "b1b348e02d07",
"2.3.1": "1de7bc13c950",
"2.3.2": "3c94c427fdf6",
"2.3.3": "f5fcbda3e651",
"2.3.4": "f5fcbda3e651",
"2.4.0": "ecb43d2a1842",
"2.4.1": "ecb43d2a1842",
"2.4.2": "b0d31815b5a6",
"2.4.3": "e07f49787c9d",
"2.5.0": "290244fb8b83",
"2.5.1": "290244fb8b83",
"2.5.2": "290244fb8b83",
"2.5.3": "290244fb8b83",
"2.6.0": "98ae134e6fff",
"2.6.1": "98ae134e6fff",
"2.6.2": "c804e5c76e3e",
"2.6.3": "c804e5c76e3e",
"2.7.0": "788397e78828",
"2.7.0": "405de8318b3a",
}


Expand Down
2 changes: 0 additions & 2 deletions dev/README_RELEASE_AIRFLOW.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,6 @@ The Release Candidate artifacts we vote upon should be the exact ones we vote ag
./dev/airflow-github changelog v2-3-stable v2-3-test
```
- Update the `REVISION_HEADS_MAP` at airflow/utils/db.py to include the revision head of the release even if there are no migrations.
- Commit the version change.
- PR from the 'test' branch to the 'stable' branch
Expand Down Expand Up @@ -973,7 +972,6 @@ This includes:
- Modify `./scripts/ci/pre_commit/pre_commit_supported_versions.py` and let pre-commit do the job.
- For major/minor release, update version in `airflow/__main__.py`, `docs/docker-stack/` and `airflow/api_connexion/openapi/v1.yaml` to the next likely minor version release.
- Update the `REVISION_HEADS_MAP` at airflow/utils/db.py to include the revision head of the release even if there are no migrations.
- Sync `RELEASE_NOTES.rst` (including deleting relevant `newsfragments`) and `README.md` changes.
- Updating `Dockerfile` with the new version.
- Updating `airflow_bug_report.yml` issue template in `.github/ISSUE_TEMPLATE/` with the new version.
Expand Down
71 changes: 49 additions & 22 deletions scripts/ci/pre_commit/pre_commit_version_heads_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,69 @@
# under the License.
from __future__ import annotations

import ast
import os
import sys
from pathlib import Path

from packaging.version import Version
import re2
from packaging.version import parse as parse_version

PROJECT_SOURCE_ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent

DB_FILE = PROJECT_SOURCE_ROOT_DIR / "airflow" / "utils" / "db.py"
MIGRATION_PATH = PROJECT_SOURCE_ROOT_DIR / "airflow" / "migrations" / "versions"

sys.path.insert(0, str(Path(__file__).parent.resolve())) # make sure common_precommit_utils is importable

from common_precommit_utils import read_airflow_version # noqa: E402

def revision_heads_map():
rh_map = {}
pattern = r'revision = "[a-fA-F0-9]+"'
airflow_version_pattern = r'airflow_version = "\d+\.\d+\.\d+"'
filenames = os.listdir(MIGRATION_PATH)

def read_revision_heads_map():
revision_heads_map_ast_obj = ast.parse(open(DB_FILE).read())
def sorting_key(filen):
prefix = filen.split("_")[0]
return int(prefix) if prefix.isdigit() else 0

revision_heads_map_ast = [
a
for a in revision_heads_map_ast_obj.body
if isinstance(a, ast.Assign) and a.targets[0].id == "REVISION_HEADS_MAP"
][0]
sorted_filenames = sorted(filenames, key=sorting_key)

revision_heads_map = ast.literal_eval(revision_heads_map_ast.value)

return revision_heads_map.keys()
for filename in sorted_filenames:
with open(os.path.join(MIGRATION_PATH, filename)) as file:
content = file.read()
revision_match = re2.search(pattern, content)
airflow_version_match = re2.search(airflow_version_pattern, content)
if revision_match and airflow_version_match:
revision = revision_match.group(0).split('"')[1]
version = airflow_version_match.group(0).split('"')[1]
if parse_version(version) >= parse_version("2.0.0"):
rh_map[version] = revision
return rh_map


if __name__ == "__main__":
airflow_version = Version(read_airflow_version())
if airflow_version.is_devrelease or "b" in (airflow_version.pre or ()):
exit(0)
versions = read_revision_heads_map()
if airflow_version.base_version not in versions:
print("Current airflow version is not in the REVISION_HEADS_MAP")
print("Current airflow version:", airflow_version)
print("Please add the version to the REVISION_HEADS_MAP at:", DB_FILE)
sys.exit(3)
with open(DB_FILE) as file:
content = file.read()

pattern = r"_REVISION_HEADS_MAP = {[^}]+\}"
match = re2.search(pattern, content)
if not match:
print(
f"_REVISION_HEADS_MAP not found in {DB_FILE}. If this has been removed intentionally, "
"please update scripts/ci/pre_commit/pre_commit_version_heads_map.py"
)
sys.exit(1)

existing_revision_heads_map = match.group(0)
rh_map = revision_heads_map()
updated_revision_heads_map = "_REVISION_HEADS_MAP = {\n"
for k, v in rh_map.items():
updated_revision_heads_map += f' "{k}": "{v}",\n'
updated_revision_heads_map += "}"
if existing_revision_heads_map != updated_revision_heads_map:
new_content = content.replace(existing_revision_heads_map, updated_revision_heads_map)

with open(DB_FILE, "w") as file:
file.write(new_content)
print("_REVISION_HEADS_MAP updated in db.py. Please commit the changes.")
sys.exit(1)

0 comments on commit 513c1d2

Please sign in to comment.