Skip to content

Commit

Permalink
完善:rename组件完善+流程细节完善
Browse files Browse the repository at this point in the history
  • Loading branch information
BleethNie committed Sep 14, 2024
1 parent 71d0d38 commit 9778c02
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 61 deletions.
2 changes: 1 addition & 1 deletion README_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ conda activate fluentui

pip uninstall NiceFlow

pip install NiceFlow-0.0.5-py3-none-any.whl
pip install NiceFlow-0.0.7-py3-none-any.whl


twine upload --repository pypi dist/*
Expand Down
12 changes: 8 additions & 4 deletions doc/doc/Rename.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@ ___
"field": "date_id",
"rename": "new_date_id"
}
]
],
"selects": [],
"excludes": ["content"]
}
}
```


### 3.2 参数说明

| 参数名称 | 是否必须 | 默认值 | 描述 |
|----------|------|-----|---------|
| columns ||| 需要修改的字段 |
| 参数名称 | 是否必须 | 默认值 | 描述 |
|----------|------|-----|-----------|
| columns ||| 需要修改的字段 |
| selects ||| 需要保留的字段 |
| excludes ||| 需要drop的字段 |



Expand Down
36 changes: 36 additions & 0 deletions samples/if/csv_input_if.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"flow": {
"name": "",
"uid": "",
"param": {

} },
"nodes": [
{
"id": "FakerInput",
"name": "read1",
"type": "input",
"properties": {
"rows":0,
"columns": ["name","address","city","street_address","date_of_birth","phone_number"],
"randoms":[
{"key":"sex","values":["","","未知"]}
]
}
},
{
"id": "Console",
"name": "write1",
"type": "output",
"properties": {
"row": 100
}
}
],
"edges": [
{
"startId": "read1",
"endId": "write1"
}
]
}
14 changes: 14 additions & 0 deletions samples/json_input/json_input_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import unittest

import duckdb.duckdb

from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager

Expand All @@ -16,6 +18,18 @@ def test_Json_input_mysql_output(self):
myFlow: Flow = FlowManager.read(path)
myFlow.run()

def test_parquet_input_doris_output(self):
path = "parquet_input_for_doris_out.json"
myFlow: Flow = FlowManager.read(path)
myFlow.run()

def test_read_parquet(self):
file_name ="table_38879b32-6e73-11ef-a882-581cf802095d.parquet"
df = duckdb.read_parquet(file_name)
print(len(df))
df.show()



if __name__ == '__main__':
unittest.main()
73 changes: 73 additions & 0 deletions samples/json_input/parquet_input_for_doris_out.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"flow": {
"name": "",
"uid": "",
"param": {
}
},
"nodes": [
{
"id": "ParquetInput",
"name": "read1",
"type": "input",
"properties": {
"file_name": "a.parquet"
}
},
{
"id": "Samples",
"name": "samples",
"type": "input",
"properties": {
"sample_size": 50
}
},
{
"id": "Rename",
"name": "rename",
"type": "input",
"properties": {
"excludes": []
}
},{
"id": "Function",
"name": "trans",
"type": "input",
"properties": {
"columns": [
{"key": "content", "function": "empty()"}
]
}
},
{
"id": "DorisOutput",
"name": "out",
"type": "translate",
"properties": {
"fe_host": "192.168.199.2",
"fe_query_port": "9030",
"fe_http_port": "8040",
"username": "root",
"password": "doris_new_passwd",
"db": "test",
"table":"news_record"
}
}

],
"edges": [
{
"startId": "read1",
"endId": "samples"
},{
"startId": "samples",
"endId": "trans"
},{
"startId": "trans",
"endId": "rename"
},{
"startId": "rename",
"endId": "out"
}
]
}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def read_file(filename):
setup(
# 基本信息
name="NiceFlow", # 项目名,确定唯一,不然上传 pypi 会失败
version="0.0.5", # 项目版本
version="0.0.7", # 项目版本
author='BleethNie', # 开发者
author_email='[email protected]', # 开发者邮箱
description='ETL数据处理/数据迁移/数据分析工具', # 摘要描述
Expand Down
3 changes: 3 additions & 0 deletions src/NiceFlow/core/functions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

def print_hello(content) -> str:
return "hello:" + str(content)

def empty() -> str:
return ""
111 changes: 86 additions & 25 deletions src/NiceFlow/plugins/doris_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,100 @@ def execute(self):

fe_host = self.param.get("fe_host", "127.0.0.1")
fe_http_port = self.param.get("fe_http_port", "8040")
fe_query_port = self.param.get("fe_query_port", "9030")
db = self.param.get("db", "")
table = self.param.get("table", "")
username = self.param.get("username", "root")
password = self.param.get("password", "123456")
#
# doris_client = DorisClient(fe_host=fe_host,
# fe_query_port=fe_query_port,
# fe_http_port=fe_http_port,
# username=username,
# password=password,
# db=db)
# doris_client.write_from_df(my_df.to_df(), f"{db}.{table}", "unique", ['id'], )
my_df.to_parquet("a.parquet")
upload_file(fe_host, fe_http_port, db, table, "a.parquet", username, password, "1111")


from pydoris.doris_client import DorisClient

client = DorisClient(fe_host, fe_query_port,fe_http_port, username, password,db)
table_path = f"table_{str(uuid.uuid1())}.parquet"
my_df.to_df().to_parquet(table_path,engine="pyarrow",index=False)
b = open(table_path, 'rb')
option = WriteOptions()
option.set_auto_uuid_label()\
.set_format("parquet")\
.set_option("column_separator",",")\
.set_option("columns","")\
.set_option("max_filter_ratio","20")
client.write(table_name="test.news_record",data=b,options=option)


# doris_helper=make_doris_helper(fe_host, fe_http_port, username, password)


# doris_helper.upload_csv(db, table, table_path)

self.set_result(None)

def to_json(self):
super(DorisOutput, self).to_json()


def upload_file(fe_host: str, fe_http_port: str, db: str, table: str, filepath: str, username: str, password: str):
url = f'http://{fe_host}:{fe_http_port}/api/{db}/{table}/_stream_load'
doris_headers = {
'label': str(uuid.uuid1()),
'column_separator': ';',
'expect': '100 continue',
"format": "parquet",
'Content-Type': 'application/octet-stream'
}
auth = HTTPBasicAuth(username, password)
p_file = open(filepath, "rb")
file = {'file': p_file}
response = requests.put(url, data=None, files=file, headers=doris_headers, auth=auth)
print(response.text)
p_file.close()
class DorisHelper():
def __init__(self, fe_host: str, fe_http_port: str, username: str, password: str):
self.fe_host = fe_host
self.fe_http_port = fe_http_port
self.username = username
self.password = password

def upload_parquet(self, db: str, table: str, filepath: str):
doris_headers = {
'label': str(uuid.uuid1()),
'column_separator': ';',
"format": "parquet",
'Content-Type': 'application/octet-stream'
}
stream_file = open(filepath, "rb")
self._upload(db, table, doris_headers, stream_file)



def upload_json(self, db: str, table: str, filepath: str):
doris_headers = {
'label': str(uuid.uuid1()),
'column_separator': ';',
"format": "json",
'Content-Type': 'application/octet-stream'
}
stream_file = open(filepath, "rb")
self._upload(db, table, doris_headers, stream_file)



def upload_orc(self, db: str, table: str, filepath: str):
doris_headers = {
'label': str(uuid.uuid1()),
'column_separator': ';',
"format": "orc",
'Content-Type': 'application/octet-stream'
}
stream_file = open(filepath, "rb")
self._upload(db, table, doris_headers, stream_file)


def upload_csv(self, db: str, table: str, filepath: str):
doris_headers = {
'label': str(uuid.uuid1()),
'column_separator': ';',
"format": "csv",
'Content-Type': 'application/octet-stream'
}
stream_file = open(filepath, "rb")
self._upload(db, table, doris_headers, stream_file)


def _upload(self, db: str, table: str, headers, stream_file):
url = f'http://{self.fe_host}:{self.fe_http_port}/api/{db}/{table}/_stream_load'
auth = HTTPBasicAuth(self.username, self.password)
file = {'file': stream_file}
response = requests.put(url, data=None, files=file, headers=headers, auth=auth)
print(response.text)
stream_file.close()


def make_doris_helper(fe_host: str, fe_http_port: str, username: str, password: str):
return DorisHelper(fe_host, fe_http_port, username, password)
14 changes: 8 additions & 6 deletions src/NiceFlow/plugins/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ def execute(self):

# 获取上一步结果
pre_node = self.pre_nodes[0]
df = self._pre_result_dict[pre_node.name]
table_columns = df.columns
pre_df = self._pre_result_dict[pre_node.name]
pre_df = pre_df.to_df()
table_columns = pre_df.columns

sql = "select * "
replace_sql = "REPLACE ( "
Expand All @@ -41,13 +42,14 @@ def execute(self):
else:
as_sql = as_sql + f"{function} as {key} , "
if replace_sql != "REPLACE ( ":
sql = sql + replace_sql.removesuffix(", ") + "), " + as_sql.removesuffix(", ") + "from df"
sql = f"{sql} {replace_sql.removesuffix(', ')} {as_sql.removesuffix(', ')}) from pre_df"
else:
sql = sql + "," + as_sql.removesuffix(", ") + "from df"
sql = sql + "," + as_sql.removesuffix(", ") + "from pre_df"

logger.debug("sql = {}".format(sql))
df = duckdb.from_df(self.con.sql(sql).df())
self.set_result(df)
df = duckdb.sql(sql,connection=self.con).to_df()
next_df = duckdb.from_df(df)
self.set_result(next_df)

def to_json(self):
super(Function, self).to_json()
Expand Down
33 changes: 24 additions & 9 deletions src/NiceFlow/plugins/rename.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,30 @@ def execute(self):

# 获取上一步结果
pre_node = self.pre_nodes[0]
df = self._pre_result_dict[pre_node.name]

columns = self.param["columns"]
rename_dict = {}
for column in columns:
rename_dict[column["field"]] = column["rename"]
df = df.to_df().rename(rename_dict, axis=1)
df = duckdb.from_df(df)
self.set_result(df)
duck_df = self._pre_result_dict[pre_node.name]
# 需要修改名称的字段
columns = self.param.get("columns",[])
# 需要选择的字段
selects = self.param.get("selects",[])
# 需要排除的字段
excludes = self.param.get("excludes",[])

if len(selects)>0:
duck_df = duck_df.select(selects)

if len(excludes)>0:
df = duck_df.to_df().drop(columns=excludes)
duck_df = duckdb.from_df(df)

if len(columns)>0:
rename_dict = {}
for column in columns:
rename_dict[column["field"]] = column["rename"]
df = duck_df.to_df().rename(rename_dict, axis=1)
duck_df = duckdb.from_df(df)


self.set_result(duck_df)

def to_json(self):
super(Rename, self).to_json()
1 change: 1 addition & 0 deletions src/test/flow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def test_flow_result(self):
myFlow.run()
result_dict = myFlow.get_result()
duck_df = list(result_dict.values())[0]
myFlow.set_result("frame_input",duck_df)
print(duck_df)

if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 9778c02

Please sign in to comment.