类似Kettle数据ETL工具,同时比Kettle更加易用和轻量,底层基于duckdb,速度超快,是一款可以让普通用户快速使用的数据处理工具,基于插件机制,
可以快速配置各种数据处理工作流,让数据处理工作流就像搭积木一样,简单易用。
- 基于python的插件机制,目前提供70+插件,同时支持自定义插件
- 基于json的flow任务,支持自定义任务配置
- 底层基于duckdb的内存数据库,支持sql脚本和json配置,支持亿级别的数据进行join查询,并且毫秒出结果
- plugin_test.py 测试插件功能
- flow_test.py 测试flow功能
NiceFlow exec --path csv_input_ck_output.json
# 1.json中的参数可以使用param参数传入
NiceFlow exec --path 1.json --param '{"name":"test"}'
# --sql_script,"sql脚本语句,支持多行"
# --sql_path,"sql脚本文件,支持多行,和sql_script二选一"
# --db_path, "输入duckdb数据库的路径,不存在则为内存模式[可选]"
# --res_path,"输入文件路径,该路径下的文件会被自动加载到db中[可选]"
# --function_path,"输入函数路径,该路径为python文件,可以作为数据库自定义函数使用[可选]"
NiceFlow sql --sql_path 1.sql \
--res_path='C:/Users/xiaow/Desktop/22/test' \
--function_path='C:/Users/xiaow/Desktop/22/test/1.python'
# sql语句
copy (select f_print(d_date) from msd_2024 where d_date = '2023-12-31') to 'C:/Users/xiaow/Desktop/22/test/2.csv';
select f_print(d_date) from msd_2024 where d_date = '2023-12-31';
# python文件中定义函数
def f_print(x:str)->str:
return x+"___";
{
"flow": {
"name": "",
"uid": "",
"param": {
}
},
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows": 10000,
"columns": [
"name",
"address",
"city",
"street_address",
"date_of_birth",
"phone_number"
],
"randoms": [
{
"key": "sex",
"values": [
"男",
"女",
"未知"
]
}
]
}
},
{
"id": "Console",
"name": "write1",
"type": "output",
"properties": {
"row": 100
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
import os
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager
def getProjectPath() -> str:
# 获取当前文件的绝对路径
current_file = os.path.abspath(__file__)
# 获取当前文件所在目录的绝对路径
current_directory = os.path.dirname(current_file)
# 获取当前项目的根目录
project_root = os.path.dirname(os.path.dirname(current_directory))
return project_root
def test_base():
path = getProjectPath() + "/doc/faker_input_console.json"
myFlow: Flow = FlowManager.read(path)
myFlow.run()
if __name__ == '__main__':
test_base()
插件 |
功能 |
完成情况 |
文档 |
Agg |
聚合组件 |
完成 |
聚合组件 |
Filter |
过滤器 |
完成 |
过滤器 |
Mapping |
映射器 |
完成 |
映射器 |
For |
遍历器 |
完成 |
遍历器 |
IF |
条件判断器 |
完成 |
条件判断器 |
Join |
连接器 |
完成 |
连接器 |
Mask |
脱敏器 |
完成 |
脱敏器 |
Pivot |
透视表 |
完成 |
透视表 |
Printer |
打印器 |
完成 |
打印器 |
RegularExtract |
正则提取器 |
|
正则提取器 |
Rename |
重命名器 |
完成 |
重命名器 |
Samples |
采样器 |
完成 |
采样器 |
Sort |
排序器 |
完成 |
排序器 |
SQL |
SQL转换器 |
完成 |
SQL转换器 |
Switch |
条件转换器 |
|
条件转换器 |
Unpivot |
取消透视表 |
完成 |
取消透视表 |
Variable |
变量转换器 |
完成 |
变量转换器 |
While |
循环转换器 |
完成 |
循环转换器 |
Duplicate |
去重器 |
完成 |
去重器 |
Console |
控制台打印 |
完成 |
控制台输出 |
SplitToRows |
列拆分为多行 |
完成 |
列转行 |
Function |
动态函数 |
完成 |
动态函数 |
插件 |
功能 |
完成情况 |
文档 |
FileOutput |
文件输出 |
完成 |
文件输出 |
KafkaOutput |
Kafka输出 |
完成 |
Kafka输出 |
SqlServerOutput |
SQLServer输出 |
完成 |
SQLServer输出 |
S3Output |
S3输出 |
完成 |
S3输出 |
PulsarOutput |
Pulsar输出 |
完成 |
Pulsar输出 |
PostgresOutput |
Postgres输出 |
完成 |
Postgres输出 |
ParquetOutput |
Parquet输出 |
完成 |
Parquet输出 |
PaimonOutput |
Paimon输出 |
完成 |
Paimon输出 |
OracleOutput |
Oracle输出 |
完成 |
Oracle输出 |
OdpsOutput |
MaxCompute输出 |
完成 |
MaxCompute输出 |
MySQLOutput |
MySQL输出 |
完成 |
MySQL输出 |
MqttOutput |
MQTT输出 |
|
|
MongoDBOutput |
MongoDB输出 |
完成 |
MongoDB输出 |
MarkdownOutput |
Markdown输出 |
完成 |
Markdown输出 |
HttpOutput |
Http输出 |
|
|
HiveOutput |
Hive输出 |
|
|
HdfsOutput |
HDFS输出 |
|
|
FtpOutput |
FTP输出 |
|
|
DB2Output |
写入数据到DB2数据库 |
|
|
DorisOutput |
写入数据到Doris数据库 |
|
|
DuckDBOutput |
写入数据到DuckDB数据库 |
|
|
ExcelOutput |
Excel输出 |
完成 |
Excel输出 |
ESOutput |
Elasticsearch输出 |
|
|
DuckOutput |
DuckDB输出 |
|
|
CsvOutput |
CSV输出 |
完成 |
CSV输出 |
CosOutput |
COS输出 |
|
|
ClickHouseOutput |
ClickHouse输出 |
|
ClickHouse输出 |
JsonOutput |
Json输出 |
|
Json输出 |
HtmlOutput |
html输出 |
|
Html输出 |
- 系统内置PyScript插件,该插件没有固定内容,可以自定义脚本,如下所示
{
"flow": {
"name": "",
"uid": "",
"param": {
} },
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows":10000,
"columns": ["name","address","city","street_address","date_of_birth","phone_number"],
"randoms":[
{"key":"sex","values":["男","女","未知"]}
]
}
},
{
"id": "PyScript",
"name": "write1",
"type": "output",
"properties": {
"content": "import json\n\nfrom NiceFlow.core.flow import Flow\nfrom NiceFlow.core.plugin import IPlugin\n\n\nclass PyScript(IPlugin):\n\n def init(self, param: json, flow: Flow):\n super(PyScript, self).init(param, flow)\n\n def execute(self):\n super(PyScript, self).execute()\n row = int(self.param.get(\"row\",10))\n\n # 获取上一步结果\n pre_node = self.pre_nodes[0]\n PyScript_df = self._pre_result_dict[pre_node.name]\n PyScript_df.limit(row).show()\n self.set_result(PyScript_df)\n\n\n def to_json(self):\n super(PyScript, self).to_json()\n\n def close(self):\n super(PyScript, self).close()"
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
- 使用
PluginManager.register_user_plugin("C://Users//xiaow//Downloads//plugins")
指定自定义插件路径
- 该目录下插件示例参考项目
src/plugins
目录
import unittest
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager, PluginManager
class TestLoadUserPlugin(unittest.TestCase):
def test_load_user_plugin(self):
path = "faker_input_to_hello.json"
PluginManager.register_user_plugin("C://Users//xw//Downloads//plugins")
myFlow: Flow = FlowManager.read(path)
flow_param = {
}
myFlow.set_param(flow_param)
myFlow.run()
if __name__ == '__main__':
unittest.main()