-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
188 lines (186 loc) · 6.57 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
const fs = require('fs');
const mysql = require('mysql');
const async = require('async');
// 定义配置json文件位置
const configJsonFilePath = __dirname + '/config.json';
// 解析配置文件
const config = JSON.parse(fs.readFileSync(configJsonFilePath).toString('UTF-8'));
// 创建数据库连接
const source = config.source;
const destination = config.destination;
const sourceConnection = mysql.createConnection(source);
const destinationConnection = mysql.createConnection(destination);
// 建立连接
sourceConnection.connect();
destinationConnection.connect();
// 开启事务
destinationConnection.beginTransaction();
// 创建策略
const strategies = {
// 自定义值
custom: (fieldInfo, curRowData, cb) => {
cb(null, fieldInfo.value);
},
// 引用值
reference: (fieldInfo, curRowData, cb) => {
cb(null, curRowData[fieldInfo.value]);
},
// 枚举替换
enumReplace: (fieldInfo, curRowData, cb) => {
cb(null, fieldInfo.enum[curRowData[fieldInfo.value]]);
},
// 源数据库查询
sourceQuery: (fieldInfo, curRowData, cb) => {
// 编译sql语句
let regExp = /\$\{([A-Za-z0-9_-]+)\}/g;
let res;
let sql = fieldInfo.value;
let params = [];
while ((res = regExp.exec(fieldInfo.value)) != null) {
if (res[1] in curRowData) {
sql = sql.replace(res[0], '?');
params.push(curRowData[res[1]]);
}
}
// 执行sql语句
sourceConnection.query(sql, params, (err, result) => {
if (err) {
console.log('执行SQL:' + sql + '失败');
destinationConnection.rollback();
throw err;
}
if (result.length === 0) {
return cb(null, null);
}
for (let key in result[0]) {
if (result[0].hasOwnProperty(key)) {
return cb(null, result[0][key]);
}
}
return cb(null, null);
});
},
// 目标数据库查询
destinationQuery: (fieldInfo, curRowData, cb) => {
// 编译sql语句
let regExp = /\$\{([A-Za-z0-9_-]+)\}/g;
let res;
let sql = fieldInfo.value;
let params = [];
while ((res = regExp.exec(fieldInfo.value)) != null) {
if (res[1] in curRowData) {
sql = sql.replace(res[0], '?');
params.push(curRowData[res[1]]);
}
}
// 执行sql语句
destinationConnection.query(sql, params, (err, result) => {
if (err) {
console.log('执行SQL:' + sql + '失败');
destinationConnection.rollback();
throw err;
}
if (result.length === 0) {
return cb(null, null);
}
for (let key in result[0]) {
if (result[0].hasOwnProperty(key)) {
return cb(null, result[0][key]);
}
}
return cb(null, null);
});
}
};
// 分批转移数据
// 定义每批的大小
const batchNum = 20;
let batch = 0;
//查询记录总数
sourceConnection.query('SELECT COUNT(*) AS `count` FROM `' + config.sourceTable + '`', (err, res) => {
if (err) {
console('查询待转移数据总数出错');
destinationConnection.rollback();
throw err;
}
// 处理查询SQL
const destinationTable = config.destinationTable;
let str = '', params = '';
destinationTable.fields.forEach((field, index) => {
if (index === 0) {
str += '(`';
params += '('
}
if (index === destinationTable.fields.length - 1) {
str += field.name + '`)';
params += '?)';
} else {
str += field.name + '`, `';
params += '?, ';
}
});
let sql = 'INSERT INTO `' + destinationTable.name + '` ' + str + ' VALUES ' + params;
console.log('数据转移执行SQL:' + sql);
let count = res[0].count, curCount = 0;
// 开始分批查询
function batchHandle () {
sourceConnection.query(`SELECT * FROM \`${config.sourceTable}\` LIMIT ${batch++ * batchNum}, ${batchNum}`, (err, res) => {
console.log(`第 ${batch} 批查询,数量 ${res.length}`);
// 处理该批数据
transferData(res, res.length === batchNum);
});
}
function transferData (data, next) {
let c = data.length, cc = 0;
data.forEach(rowData => {
// 判断断言是否满足
if (destinationTable.assert) {
for (let key in destinationTable.assert) {
if (destinationTable.assert.hasOwnProperty(key)) {
if (key in rowData) {
if (destinationTable.assert[key] === 'not null' && rowData[key] == null) {
--c;
return --count;
}
if (destinationTable.assert[key] === 'null' && rowData[key] != null) {
--c;
return --count;
}
}
}
}
}
// 制作参数
let paramCallbacks = [];
destinationTable.fields.forEach(field => {
paramCallbacks.push(cb => {
strategies[field.valueType](field, rowData, cb);
});
});
async.series(paramCallbacks, (err, results) => {
destinationConnection.query(sql, results, (err, result) => {
if (err) {
console.log('执行SQL:' + sql + '失败');
destinationConnection.rollback();
throw err;
}
++curCount;
++cc;
console.log(`总共 ${count} 条数据,已经转移成功 ${curCount} 条数据`);
if (c === cc && next) {
// 继续分批查询
batchHandle();
}
if (count === curCount) {
// 数据转移结束,提交事务
destinationConnection.commit();
console.log('数据转移完毕');
process.exit(0);
}
});
});
});
}
// 执行分批查询
batchHandle();
});