-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathStructure_Steam_Calculator.py
132 lines (120 loc) · 4.72 KB
/
Structure_Steam_Calculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import platform
import os
import argparse
import csv
import io
import logging
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Iterable
from datetime import datetime
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow
from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
from pyflink.common import SimpleStringSchema
# 定义一个beep函数,根据不同的操作系统,播放不同的声音
def beep():
# 如果是Windows系统
if platform.system() == "Windows":
# 导入winsound模块
import winsound
# 播放440Hz的音调,持续1000ms
winsound.Beep(440, 1000)
# 如果是Linux系统
elif platform.system() == "Linux":
# 播放beep命令
os.system("beep")
# 如果是其他系统
else:
# 打印不支持的平台
print("Unsupported platform")
# 定义一个parse_csv函数,用于解析csv文件
def parse_csv(x):
# 将x中的[b'替换为空
x = x.replace("[b'", "")
# 将x中的\n']替换为空
x = x.replace("\n']", "")
# 将x中的\\n']替换为空
x = x.replace("\\n']", "")
# 将x中的\r']替换为空
x = x.replace("\r", "")
# 将x中的\\r']替换为空
x = x.replace("\\r", "")
# 将x转换为csv格式
result = csv.reader(io.StringIO(x))
# 创建一个空列表,用于存放解析后的结果
parsed_result = []
# 遍历result中的每一项
for item in result:
# 创建一个空列表,用于存放解析后的每一项
parsed_item = []
# 遍历item中的每一项
for element in item:
# 尝试将element转换为整数
try:
parsed_element = int(element)
# 如果转换失败,则将element的值赋给parsed_element
except ValueError:
parsed_element = element
# 将parsed_element添加到parsed_item中
parsed_item.append(parsed_element)
# 将parsed_item添加到parsed_result中
parsed_result.append(parsed_item)
# 返回解析后的结果
return parsed_result
# 定义一个count_rows函数,用于计算data中行数
def count_rows(data):
# 计算data中行数
row_count = len(data)
# 获取data的类型
type_count = type(data)
# 打印data中行数和类型
print(f"Received {row_count} rows of {type_count} data.")
# 返回data
return data
# 定义一个check_data函数,用于检查data中每一行的数据
def check_data(data):
# 检查data中第一行的数据是否大于0.5
if abs(float(data[0][1])) >= 0.5:
# 如果大于0.5,则播放beep函数
beep()
# 打印data中第一行的数据和ABS值
# print(f"data at {data[0][0]} is {(data[0][1])}",f" ABS Larger than 0.5!\n")
# 返回data
return abs(float(data[0][1])) >= 0.5
# 定义一个read_from_kafka函数,用于从kafka中读取数据
def read_from_kafka():
# 获取StreamExecutionEnvironment实例
env = StreamExecutionEnvironment.get_execution_environment()
# 添加jars
env.add_jars("file:///home/hadoop/Desktop/PyFlink-Tutorial/flink-sql-connector-kafka-3.1-SNAPSHOT.jar")
# 打印提示信息
print("start reading data from kafka")
# 创建一个FlinkKafkaConsumer实例,用于从kafka中读取数据
kafka_consumer = FlinkKafkaConsumer(
topics='structure',
deserialization_schema= SimpleStringSchema('UTF-8'),
properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'my-group'}
)
# 将kafka_consumer添加到StreamExecutionEnvironment中
stream = env.add_source(kafka_consumer)
# 将stream中的每一行数据转换为csv格式
parsed_stream = stream.map(parse_csv)
# 将parsed_stream中的每一行数据传入check_data函数,检查数据是否符合要求
data_stream = parsed_stream.filter(check_data)
# 将data_stream中的数据打印到标准输出中
print("Printing result to stdout.")
data_stream.print()
# 执行StreamExecutionEnvironment
env.execute()
if __name__ == '__main__':
# 调用read_from_kafka函数,从kafka中读取数据
read_from_kafka()