Skip to content

Commit

Permalink
新增:新增mqtt_input插件
Browse files Browse the repository at this point in the history
  • Loading branch information
BleethNie committed Mar 31, 2024
1 parent 32fad38 commit c50a954
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 23 deletions.
42 changes: 42 additions & 0 deletions samples/mqtt_input/mqtt_input_for_mysql_output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"flow": {
"name": "",
"uid": "",
"param": {
}
},
"nodes": [
{
"id": "MqttInput",
"name": "read1",
"type": "input",
"properties": {
"host": "192.168.1.90",
"port": 1883,
"topic": "/a/a",
"username": "",
"password": ""
}
},
{
"id": "MySQLOutput",
"name": "out",
"type": "translate",
"properties": {
"host": "127.0.0.1",
"port": 3306,
"username": "root",
"password": "123456",
"db": "che",
"table": "che_config",
"write_method": "insert"
}
}
],
"edges": [
{
"startId": "read1",
"endId": "out"
}
]
}
17 changes: 17 additions & 0 deletions samples/mqtt_input/mqtt_input_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import unittest

from NiceFlow.core.flow import Flow
from NiceFlow.core.manager import FlowManager


class TestMqttInput(unittest.TestCase):

def test_mqtt_input(self):
path = "mqtt_input_for_mysql_output.json"
myFlow: Flow = FlowManager.read(path)
myFlow.run()



if __name__ == '__main__':
unittest.main()
4 changes: 0 additions & 4 deletions src/NiceFlow/plugins/ftp_input.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import json

import duckdb
import pandas as pd
from clickhouse_driver import Client

from NiceFlow.core.flow import Flow
from NiceFlow.core.plugin import IPlugin

Expand Down
55 changes: 41 additions & 14 deletions src/NiceFlow/plugins/mqtt_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from NiceFlow.core.flow import Flow
from NiceFlow.core.plugin import IPlugin
import paho.mqtt.client as mqtt
from paho.mqtt import client as mqtt_client


class MqttInput(IPlugin):
Expand All @@ -15,20 +14,40 @@ def init(self, param: json, flow: Flow):
super(MqttInput, self).init(param, flow)

def on_connect(self, client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("$SYS/#")

def subscribe(self,client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
print(msg.topic + " " + str(msg.payload))
df = pd.DataFrame(msg.payload)
ck_df = duckdb.from_df(df)
# 写入结果
self.set_result(ck_df)

# 0: Connection successful
# 1: Connection refused - incorrect protocol version
# 2: Connection refused - invalid client identifier
# 3: Connection refused - server unavailable
# 4: Connection refused - bad username or password
# 5: Connection refused - not authorised
# 6-255: Currently unused.
if rc == 0:
print("Connection successful")
if rc == 1:
print("Connection refused - incorrect protocol version")
if rc == 2:
print("Connection refused - invalid client identifier")
if rc == 3:
print("Connection refused - server unavailable")
if rc == 4:
print("Connection refused - bad username or password")
if rc == 5:
print("Connection refused - not authorised")
if rc!=0:
return
# 在连接建立后订阅主题
client.subscribe(self.topic)
client.on_message = on_message

def on_message(self, client, userdata, msg):
msg_str = msg.payload.decode('utf-8')
json_data = json.loads(msg_str)
if isinstance(json_data,list):
df = pd.DataFrame(json_data)
else:
df = pd.DataFrame([json_data])
ck_df = duckdb.from_df(df)
# 写入结果
self.set_result(ck_df)

def execute(self):
super(MqttInput, self).execute()
Expand All @@ -37,15 +56,23 @@ def execute(self):
host = self.param["host"]
port = self.param.get("port", 1883)
self.topic = self.param.get("topic", "")
username = self.param.get("username", None)
password = self.param.get("password", None)
clientId = self.param.get("client_id", "")

# 配置数据库
if clientId:
client = mqtt.Client(client_id=clientId)
else:
client = mqtt.Client()

# 配置消息回调
client.on_connect = self.on_connect
client.on_message = self.on_message

if username:
client.username_pw_set(username=username, password=password)

client.connect(host, port)
client.loop_forever()

Expand Down
12 changes: 9 additions & 3 deletions src/NiceFlow/plugins/pivot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json

import duckdb

from NiceFlow.core.flow import Flow
from NiceFlow.core.plugin import IPlugin

Expand All @@ -11,12 +13,16 @@ def init(self, param: json,flow:Flow):

def execute(self):
super(Pivot, self).execute()
# PIVOT Cities ON Year USING sum(Population);
key = self.param["key"]
value = self.param["value"]
agg = self.param["agg"]

self.param[""]
# 获取上一步结果
pre_node = self.pre_nodes[0]
pre_df = self._pre_result_dict[pre_node.name]
self.set_result(pre_df)
duck_df = self._pre_result_dict[pre_node.name]
result_df = duckdb.sql(f"PIVOT duck_df ON {key} USING by {agg}({value}) ")
self.set_result(result_df)

def to_json(self):
super(Pivot, self).to_json()
4 changes: 2 additions & 2 deletions src/plugins/hello/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ def init(self, param: json, flow: Flow):
def execute(self):
super(HelloInput, self).execute()
df = pd.DataFrame([{"F1":"1","F2":"2"}])
ck_df = duckdb.from_df(df)
duck_df = duckdb.from_df(df)
# 写入结果
self.set_result(ck_df)
self.set_result(duck_df)


def to_json(self):
Expand Down

0 comments on commit c50a954

Please sign in to comment.