类似Kettle数据ETL工具,同时比Kettle更加易用和轻量,底层基于duckdb,速度超快,是一款可以让普通用户快速使用的数据处理工具,基于插件机制, 可以快速配置各种数据处理工作流,让数据处理工作流就像搭积木一样,简单易用。
- 基于python的插件机制,目前提供70+插件,同时支持自定义插件
- 基于json的flow任务,支持自定义任务配置
- 底层基于duckdb的内存数据库,支持sql脚本和json配置,支持亿级别的数据进行join查询,并且毫秒出结果
pip install NiceFlow
- plugin_test.py 测试插件功能
- flow_test.py 测试flow功能
pip install NiceFlow
NiceFlow exec --path csv_input_ck_output.json
# 1.json中的参数可以使用param参数传入
NiceFlow exec --path 1.json --param '{"name":"test"}'
# --sql_script,"sql脚本语句,支持多行"
# --sql_path,"sql脚本文件,支持多行,和sql_script二选一"
# --db_path, "输入duckdb数据库的路径,不存在则为内存模式[可选]"
# --res_path,"输入文件路径,该路径下的文件会被自动加载到db中[可选]"
# --function_path,"输入函数路径,该路径为python文件,可以作为数据库自定义函数使用[可选]"
NiceFlow sql --sql_path 1.sql \
--res_path='C:/Users/xiaow/Desktop/22/test' \
--function_path='C:/Users/xiaow/Desktop/22/test/1.python'
# sql语句
copy (select f_print(d_date) from msd_2024 where d_date = '2023-12-31') to 'C:/Users/xiaow/Desktop/22/test/2.csv';
select f_print(d_date) from msd_2024 where d_date = '2023-12-31';
# python文件中定义函数
def f_print(x:str)->str:
return x+"___";
- faker_input_console.json
{
"flow": {
"name": "",
"uid": "",
"param": {
}
},
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows": 10000,
"columns": [
"name",
"address",
"city",
"street_address",
"date_of_birth",
"phone_number"
],
"randoms": [
{
"key": "sex",
"values": [
"男",
"女",
"未知"
]
}
]
}
},
{
"id": "Console",
"name": "write1",
"type": "output",
"properties": {
"row": 100
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
import os
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager
def getProjectPath() -> str:
# 获取当前文件的绝对路径
current_file = os.path.abspath(__file__)
# 获取当前文件所在目录的绝对路径
current_directory = os.path.dirname(current_file)
# 获取当前项目的根目录
project_root = os.path.dirname(os.path.dirname(current_directory))
return project_root
def test_base():
path = getProjectPath() + "/doc/faker_input_console.json"
myFlow: Flow = FlowManager.read(path)
myFlow.run()
if __name__ == '__main__':
test_base()
插件 | 功能 | 完成情况 | 文档 |
---|---|---|---|
AkshareInput | 读取金融股票等财经数据 | 完成 | Akshare输入 |
Starter | 启动器 | 启动器 | |
CsvInput | 读取CSV数据 | 完成 | CSV输入 |
FakerInput | 假数据生成 | 完成 | 假数据生成 |
ParquetInput | 读取Parquet数据 | 完成 | Parquet输入 |
ExcelInput | 读取Excel数据 | 完成 | Excel输入 |
MySQLInput | 读取MySQL数据 | 完成 | MySQL输入 |
DuckDBInput | 读取DuckDB数据 | 完成 | DuckDB输入 |
ClickHouseInput | 读取ClickHouse数据 | 完成 | ClickHouse输入 |
OdpsInput | 读取MaxCompute数据 | 完成 | Odps输入 |
ESInput | 读取Elasticsearch数据 | 完成 | Elasticsearch输入 |
MongoDBInput | 读取MongoDB数据 | 完成 | MongoDB输入 |
MqttInput | 从Mqtt Broker读取数据 | 完成 | MqttInput输入 |
DB2Input | 读取DB2数据库数据 | ||
DorisInput | 读取Doris数据库数据 | ||
DuckDBInput | 读取DuckDB数据库数据 | ||
FrameInput | 从内存中读取数据 | 完成 | FrameInput输入 |
JsonInput | json文件读取 | 完成 | JsonInput输入 |
插件 | 功能 | 完成情况 | 文档 |
---|---|---|---|
Agg | 聚合组件 | 完成 | 聚合组件 |
Filter | 过滤器 | 完成 | 过滤器 |
Mapping | 映射器 | 完成 | 映射器 |
For | 遍历器 | 完成 | 遍历器 |
IF | 条件判断器 | 完成 | 条件判断器 |
Join | 连接器 | 完成 | 连接器 |
Mask | 脱敏器 | 完成 | 脱敏器 |
Pivot | 透视表 | 完成 | 透视表 |
Printer | 打印器 | 完成 | 打印器 |
RegularExtract | 正则提取器 | 正则提取器 | |
Rename | 重命名器 | 完成 | 重命名器 |
Samples | 采样器 | 完成 | 采样器 |
Sort | 排序器 | 完成 | 排序器 |
SQL | SQL转换器 | 完成 | SQL转换器 |
Switch | 条件转换器 | 条件转换器 | |
Unpivot | 取消透视表 | 完成 | 取消透视表 |
Variable | 变量转换器 | 完成 | 变量转换器 |
While | 循环转换器 | 完成 | 循环转换器 |
Duplicate | 去重器 | 完成 | 去重器 |
Console | 控制台打印 | 完成 | 控制台输出 |
SplitToRows | 列拆分为多行 | 完成 | 列转行 |
Function | 动态函数 | 完成 | 动态函数 |
插件 | 功能 | 完成情况 | 文档 |
---|---|---|---|
FileOutput | 文件输出 | 完成 | 文件输出 |
KafkaOutput | Kafka输出 | 完成 | Kafka输出 |
SqlServerOutput | SQLServer输出 | 完成 | SQLServer输出 |
S3Output | S3输出 | 完成 | S3输出 |
PulsarOutput | Pulsar输出 | 完成 | Pulsar输出 |
PostgresOutput | Postgres输出 | 完成 | Postgres输出 |
ParquetOutput | Parquet输出 | 完成 | Parquet输出 |
PaimonOutput | Paimon输出 | 完成 | Paimon输出 |
OracleOutput | Oracle输出 | 完成 | Oracle输出 |
OdpsOutput | MaxCompute输出 | 完成 | MaxCompute输出 |
MySQLOutput | MySQL输出 | 完成 | MySQL输出 |
MqttOutput | MQTT输出 | ||
MongoDBOutput | MongoDB输出 | 完成 | MongoDB输出 |
MarkdownOutput | Markdown输出 | 完成 | Markdown输出 |
HttpOutput | Http输出 | ||
HiveOutput | Hive输出 | ||
HdfsOutput | HDFS输出 | ||
FtpOutput | FTP输出 | ||
DB2Output | 写入数据到DB2数据库 | ||
DorisOutput | 写入数据到Doris数据库 | ||
DuckDBOutput | 写入数据到DuckDB数据库 | ||
ExcelOutput | Excel输出 | 完成 | Excel输出 |
ESOutput | Elasticsearch输出 | ||
DuckOutput | DuckDB输出 | ||
CsvOutput | CSV输出 | 完成 | CSV输出 |
CosOutput | COS输出 | ||
ClickHouseOutput | ClickHouse输出 | ClickHouse输出 | |
JsonOutput | Json输出 | Json输出 | |
HtmlOutput | html输出 | Html输出 |
- 系统内置PyScript插件,该插件没有固定内容,可以自定义脚本,如下所示
{
"flow": {
"name": "",
"uid": "",
"param": {
} },
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows":10000,
"columns": ["name","address","city","street_address","date_of_birth","phone_number"],
"randoms":[
{"key":"sex","values":["男","女","未知"]}
]
}
},
{
"id": "PyScript",
"name": "write1",
"type": "output",
"properties": {
"content": "import json\n\nfrom NiceFlow.core.flow import Flow\nfrom NiceFlow.core.plugin import IPlugin\n\n\nclass PyScript(IPlugin):\n\n def init(self, param: json, flow: Flow):\n super(PyScript, self).init(param, flow)\n\n def execute(self):\n super(PyScript, self).execute()\n row = int(self.param.get(\"row\",10))\n\n # 获取上一步结果\n pre_node = self.pre_nodes[0]\n PyScript_df = self._pre_result_dict[pre_node.name]\n PyScript_df.limit(row).show()\n self.set_result(PyScript_df)\n\n\n def to_json(self):\n super(PyScript, self).to_json()\n\n def close(self):\n super(PyScript, self).close()"
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
- 使用
PluginManager.register_user_plugin("C://Users//xiaow//Downloads//plugins")
指定自定义插件路径 - 该目录下插件示例参考项目
src/plugins
目录
import unittest
from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager, PluginManager
class TestLoadUserPlugin(unittest.TestCase):
def test_load_user_plugin(self):
path = "faker_input_to_hello.json"
PluginManager.register_user_plugin("C://Users//xw//Downloads//plugins")
myFlow: Flow = FlowManager.read(path)
flow_param = {
}
myFlow.set_param(flow_param)
myFlow.run()
if __name__ == '__main__':
unittest.main()