-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy path60dbbe448c16_data_migrate.py
106 lines (83 loc) · 3.3 KB
/
60dbbe448c16_data_migrate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""data_migrate
迁移 ID: 60dbbe448c16
父迁移: 1269fc241751
创建时间: 2024-12-03 13:41:39.689910
"""
from __future__ import annotations
import math
from collections.abc import Sequence
from alembic import op
from nonebot.log import logger
from sqlalchemy import insert, inspect, select
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
revision: str = "60dbbe448c16"
down_revision: str | Sequence[str] | None = "1269fc241751"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def data_migrate() -> None:
conn = op.get_bind()
insp = inspect(conn)
table_names = insp.get_table_names()
if "nonebot_plugin_memes_memegenerationrecord" not in table_names:
return
Base = automap_base()
Base.prepare(autoload_with=conn)
MemeGenerationRecord = Base.classes.nonebot_plugin_memes_memegenerationrecord
MemeGenerationRecordV2 = Base.classes.nonebot_plugin_memes_memegenerationrecord_v2
with Session(conn) as db_session:
count = db_session.query(MemeGenerationRecord).count()
if count == 0:
return
try:
from nonebot_session_to_uninfo import check_tables, get_id_map
except ImportError:
raise ValueError("请安装 `nonebot-session-to-uninfo` 以迁移数据")
check_tables()
migration_limit = 10000 # 每次迁移的数据量为 10000 条
last_message_id = -1
id_map: dict[int, int] = {}
logger.warning("memes: 正在迁移数据,请不要关闭程序...")
for i in range(math.ceil(count / migration_limit)):
statement = (
select(
MemeGenerationRecord.id,
MemeGenerationRecord.session_persist_id,
MemeGenerationRecord.time,
MemeGenerationRecord.meme_key,
)
.order_by(MemeGenerationRecord.id)
.where(MemeGenerationRecord.id > last_message_id)
.limit(migration_limit)
)
records = db_session.execute(statement).all()
last_message_id = records[-1][0]
session_ids = [record[1] for record in records if record[1] not in id_map]
if session_ids:
id_map.update(get_id_map(session_ids))
bulk_insert_records = []
for record in records:
bulk_insert_records.append(
{
"id": record[0],
"session_persist_id": id_map[record[1]],
"time": record[2],
"meme_key": record[3],
}
)
db_session.execute(insert(MemeGenerationRecordV2), bulk_insert_records)
logger.info(f"memes: 已迁移 {i * migration_limit + len(records)}/{count}")
db_session.commit()
logger.warning("memes: 数据迁移完成!")
def upgrade(name: str = "") -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
data_migrate()
# ### end Alembic commands ###
def downgrade(name: str = "") -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###