Skip to content

Commit

Permalink
Fix AIP-74 migration errors (#43313)
Browse files Browse the repository at this point in the history
* fix(migrations): fix typos in the dataset to asset migration files and model

* fix(migrations): add server_default to the newly added "group" column in the dataset table

without this change, db with existing dataset rows break due to nullable=False
  • Loading branch information
Lee-W authored Oct 24, 2024
1 parent 7c6c106 commit 06da35c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def upgrade():
# Add 'name' column. Set it to nullable for now.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default=str, nullable=False))
batch_op.add_column(
sa.Column("group", _STRING_COLUMN_TYPE, default=str, server_default="", nullable=False)
)
# Fill name from uri column.
with Session(bind=op.get_bind()) as session:
session.execute(sa.text("update dataset set name=uri"))
Expand Down
20 changes: 10 additions & 10 deletions airflow/migrations/versions/0040_3_0_0_rename_dataset_as_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
constraint_name="asset_alias_asset_alias_id_fk_key",
constraint_name="asset_alias_asset_alias_id_fkey",
referent_table="asset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
Expand All @@ -152,7 +152,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
constraint_name="asset_alias_asset_asset_id_fk_key",
constraint_name="asset_alias_asset_asset_id_fkey",
referent_table="asset",
local_cols=["asset_id"],
remote_cols=["id"],
Expand All @@ -169,7 +169,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("asset_alias_asset_event_asset_id_fkey"),
constraint_name=op.f("asset_alias_asset_event_alias_id_fkey"),
referent_table="asset_alias",
local_cols=["alias_id"],
remote_cols=["id"],
Expand All @@ -185,7 +185,7 @@ def upgrade():
unique=False,
)
batch_op.create_foreign_key(
constraint_name=op.f("asset_alias_asset_event_event_id_fk_key"),
constraint_name=op.f("asset_alias_asset_event_event_id_fkey"),
referent_table="asset_event",
local_cols=["event_id"],
remote_cols=["id"],
Expand All @@ -199,7 +199,7 @@ def upgrade():
_rename_pk_constraint(
batch_op=batch_op,
original_name="dsdar_pkey",
new_name="asaar_pkey",
new_name="dsaar_pkey",
columns=["alias_id", "dag_id"],
)
_rename_index(
Expand All @@ -218,7 +218,7 @@ def upgrade():
ondelete="CASCADE",
)
batch_op.create_foreign_key(
constraint_name="dsaar_dag_id_fkey",
constraint_name="dsaar_dag_fkey",
referent_table="dag",
local_cols=["dag_id"],
remote_cols=["dag_id"],
Expand All @@ -230,7 +230,7 @@ def upgrade():

with op.batch_alter_table("dag_schedule_asset_reference", schema=None) as batch_op:
batch_op.drop_constraint("dsdr_dag_id_fkey", type_="foreignkey")
if op.get_bind().dialect.name in ("postgres", "mysql"):
if op.get_bind().dialect.name in ("postgresql", "mysql"):
batch_op.drop_constraint("dsdr_dataset_fkey", type_="foreignkey")

_rename_pk_constraint(
Expand Down Expand Up @@ -266,7 +266,7 @@ def upgrade():
batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False)

batch_op.drop_constraint("todr_dag_id_fkey", type_="foreignkey")
if op.get_bind().dialect.name in ("postgres", "mysql"):
if op.get_bind().dialect.name in ("postgresql", "mysql"):
batch_op.drop_constraint("todr_dataset_fkey", type_="foreignkey")

_rename_pk_constraint(
Expand Down Expand Up @@ -303,7 +303,7 @@ def upgrade():
batch_op.alter_column("dataset_id", new_column_name="asset_id", type_=sa.Integer(), nullable=False)

batch_op.drop_constraint("ddrq_dag_fkey", type_="foreignkey")
if op.get_bind().dialect.name in ("postgres", "mysql"):
if op.get_bind().dialect.name in ("postgresql", "mysql"):
batch_op.drop_constraint("ddrq_dataset_fkey", type_="foreignkey")

_rename_pk_constraint(
Expand Down Expand Up @@ -499,7 +499,7 @@ def downgrade():

_rename_pk_constraint(
batch_op=batch_op,
original_name="asaar_pkey",
original_name="dsaar_pkey",
new_name="dsdar_pkey",
columns=["alias_id", "dag_id"],
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class DagScheduleAssetAliasReference(Base):

__tablename__ = "dag_schedule_asset_alias_reference"
__table_args__ = (
PrimaryKeyConstraint(alias_id, dag_id, name="asaar_pkey"),
PrimaryKeyConstraint(alias_id, dag_id, name="dsaar_pkey"),
ForeignKeyConstraint(
(alias_id,),
["asset_alias.id"],
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
fa87dd4d48e630a5e523157bebb8710fcc4cee9527b735ac67ec1274d5858bce
bda4fb36d2ac0f34e60a9969b6c1c1e6a98b555b0fc6d0e7bfcee9a89fb95fbf

0 comments on commit 06da35c

Please sign in to comment.