Skip to content

Commit

Permalink
Adding streaming service.
Browse files Browse the repository at this point in the history
  • Loading branch information
giacuong171 committed Dec 10, 2024
1 parent af60d20 commit b9ef5a3
Show file tree
Hide file tree
Showing 88 changed files with 1,578 additions and 1 deletion.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ datalake_restart:
airflow_up:
docker compose -f airflow-docker-compose.yaml up -d
airflow_down:
docker compose -f airflow-docker-compose.yaml down
docker compose -f airflow-docker-compose.yaml down
kafka_up:
docker compose -f stream_processing/kafka/docker-compose.yml up -d
kafka_down:
docker compose -f stream_processing/kafka/docker-compose.yml down
Binary file added imgs/airflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/airflow1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/airflow2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/airflow3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/airflow4.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/airflow5.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/airflow6.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/airflow7.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/architecture1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/create_compute_instance.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/data.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/datadrawio.drawio.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/dbt.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/debezium-cdc.webp
Binary file not shown.
Binary file added imgs/debezium.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/deequ.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/final.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/gcp.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/gcp1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/gcp2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/gcp3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/grafana.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/kafka-topic.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/kafka.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/kafka1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/kafka2.1.png
Binary file added imgs/kafka2.png
Binary file added imgs/kafka_mess.png
Binary file added imgs/kibana.png
Binary file added imgs/minio.png
Binary file added imgs/minio1.png
Binary file added imgs/minio2.png
Binary file added imgs/minio3.png
Binary file added imgs/minio4.png
Binary file added imgs/minio5.png
Binary file added imgs/minio6.png
Binary file added imgs/mle.drawio.png
Binary file added imgs/mle2.drawio.png
Binary file added imgs/ssh_key_out.gif
Binary file added imgs/trino.png
Binary file added imgs/validation.png
Binary file added jars/avro-1.11.1.jar
Binary file not shown.
Binary file added jars/deequ-2.0.3-spark-3.3.jar
Binary file not shown.
Binary file added jars/flink-avro-1.16.0.jar
Binary file not shown.
Binary file added jars/flink-avro-1.17.1.jar
Binary file not shown.
Binary file added jars/flink-avro-confluent-registry-1.16.0.jar
Binary file not shown.
Binary file added jars/flink-avro-confluent-registry-1.17.1.jar
Binary file not shown.
Binary file added jars/flink-connector-kafka-1.16.0.jar
Binary file not shown.
Binary file added jars/flink-connector-kafka-1.17.1.jar
Binary file not shown.
Binary file added jars/flink-table-api-java-1.16.0.jar
Binary file not shown.
Binary file added jars/flink-table-api-java-1.17.1.jar
Binary file not shown.
Binary file added jars/hadoop-aws-3.3.4.jar
Binary file not shown.
Binary file added jars/jackson-annotations-2.14.2.jar
Binary file not shown.
Binary file added jars/jackson-core-2.14.2.jar
Binary file not shown.
Binary file added jars/jackson-databind-2.14.2.jar
Binary file not shown.
Binary file added jars/kafka-clients-3.4.0.jar
Binary file not shown.
Binary file added jars/kafka-connect-jdbc-10.6.4.jar
Binary file not shown.
Binary file added jars/kafka-schema-registry-client-5.3.0.jar
Binary file not shown.
Binary file added jars/postgresql-42.4.3.jar
Binary file not shown.
Binary file added jars/trino-jdbc-434.jar
Binary file not shown.
2 changes: 2 additions & 0 deletions stream_processing/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
old-dc.yml
.DS_Store
41 changes: 41 additions & 0 deletions stream_processing/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
## Streamming data
+ Streaming data is a data that is continuously generated by thousands of data sources, which typically send the data records in small sizes.
+ In this project, we use Kafka as a streaming data source and Pyflink to handle this streaming data
+ The data is generated from the nyc taxi data in the datalake( you can get data from ```data/stream/stream.parquet```)
### Streaming data source
+ Nyc taxi streaming data is generated based on data from datalake
+ Each newly created data sample is stored in a table in PostgreSQL
+ Debezium then acts as a connector with PostgreSQL and will scan the table to check if the database has newly updated data.
+ Newly created data will be pushed to corresponding topics in kafka
+ Any consumer can receive messages from the topic to which the consumer subscribes
#### How to guide
First, we change directory to `stream_processing/kafka``
+ ```bash run.sh register_connector configs/postgresql-cdc.json```to send PostgreSQL config to Debezium
![](../imgs/debezium.png)
+ ```python create_table.py``` to create a new table on PostgreSQL
+ ```python insert_table.py``` to insert data to the table
+ We can access Kafka at port 9021 to check the results
![](../imgs/kafka.png)
+ Then click **Topics** bar to get all existing topics on Kafka
![](../imgs/kafka1.png)
+ **nyc_taxi.public.nyc_taxi** is my created topic
+ Choose **Messages** to observe streaming messages
![](../imgs/kafka_mess.png)
+ Finally, you can create kafka service for streaming data
```
cd stream_processing/kafka
docker build -t nyc_producer:latest .
docker image tag nyc_producer:latest ${name}/nyc_producer:latest
docker push ${name}/nyc_producer:latest #name is your docker hub name
```
### Streaming processing
+ To handle this streaming datasource, Pyflink or Kafka can be used, but in this project, we use Pyflink to process the data
#### How to guide
+ ```cd stream_processing/scripts```
+ ```python datastream_api.py && python window_datastream_api.py```
+ These scripts will extract the necessary information fields in the message and aggregate the data to serve many purposes
+ Processed data samples will be stored in kafka in the specified sink
![](../imgs/kafka1.png)
+ **nyc_taxi.sink.datastream** and **nyc_taxi.sink_window.datastream** is the defined sink and window sink in my case
+ ```python kafka_consumer.py```
+ Messages from topic will be stored and used for further processing(analyse, visualize,cost prediction,...)
22 changes: 22 additions & 0 deletions stream_processing/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM python:3.8-slim

# Copy app handler code

COPY kafka_producer/produce.py produce.py
COPY kafka_producer/generate_schemas.py generate_schemas.py
COPY kafka_producer/streamming_data.parquet streamming_data.parquet
COPY run.sh .

# Install dependencies
RUN pip3 install kafka-python==2.0.2
RUN pip3 install avro==1.11.1
RUN pip3 install pandas==1.5.1
RUN pip3 install pyarrow==10.0.1
RUN pip3 install python-schema-registry-client==2.4.1
RUN pip3 install pymongo==4.5.0
RUN pip3 install pandas==1.5.3

# Uncomment this to generate a random schema
RUN chmod +x /run.sh && ./run.sh generate_schemas

CMD [ "python", "-u", "produce.py", "--mode", "setup", "--bootstrap_servers", "broker:29092"]
88 changes: 88 additions & 0 deletions stream_processing/flink/avro_schemas/schema_0.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"name": "nyc_taxi_id",
"type": "int"
},
{
"name": "created",
"type": "string"
},
{
"name": "vendorid",
"type": "int"
},
{
"name": "tpep_pickup_datetime",
"type": "string"
},
{
"name": "tpep_dropoff_datetime",
"type": "string"
},
{
"name": "passenger_count",
"type": "float"
},
{
"name": "trip_distance",
"type": "float"
},
{
"name": "ratecodeid",
"type": "float"
},
{
"name": "store_and_fwd_flag",
"type": "string"
},
{
"name": "pulocationid",
"type": "int"
},
{
"name": "dolocationid",
"type": "int"
},
{
"name": "payment_type",
"type": "int"
},
{
"name": "fare_amount",
"type": "float"
},
{
"name": "extra",
"type": "float"
},
{
"name": "mta_tax",
"type": "float"
},
{
"name": "tip_amount",
"type": "float"
},
{
"name": "tolls_amount",
"type": "float"
},
{
"name": "improvement_surcharge",
"type": "float"
},
{
"name": "total_amount",
"type": "float"
},
{
"name": "congestion_surcharge",
"type": "float"
}
],
"name": "nyctaxi",
"namespace": "example.avro",
"type": "record"
}
12 changes: 12 additions & 0 deletions stream_processing/flink/configs/connect-timescaledb-sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "nyctaxi-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "sink_nyctaxi_0",
"connection.url": "jdbc:postgresql://host.docker.internal:5432/k6",
"connection.user": "k6",
"connection.password": "k6",
"auto.create": true
}
}
158 changes: 158 additions & 0 deletions stream_processing/flink/docker-compose_flink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
---
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
# hostname: zookeeper
container_name: flink-zookeeper
ports:
- "2181:2181"
healthcheck:
test: echo srvr | nc zookeeper 2181 || exit 1
start_period: 10s
retries: 20
interval: 10s
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

# Kafka broker
broker:
image: confluentinc/cp-server:7.5.0
# hostname: broker
container_name: flink-broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
healthcheck:
test: nc -z localhost 9092 || exit -1
start_period: 15s
interval: 5s
timeout: 10s
retries: 10
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

# For managing Avro schemas
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
# hostname: schema-registry
container_name: flink-schema-registry
depends_on:
- broker
ports:
- "8081:8081"
healthcheck:
start_period: 10s
interval: 10s
retries: 20
test: curl --user superUser:superUser --fail --silent --insecure http://localhost:8081/subjects --output /dev/null || exit 1
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

# For connecting to offline store
connect:
image: confluentinc/cp-kafka-connect:7.5.0
# hostname: connect
container_name: flink-connect
depends_on:
broker:
condition: service_healthy
schema-registry:
condition: service_healthy
zookeeper:
condition: service_healthy
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
volumes:
- $PWD/data_ingestion/kafka_connect/jars/:/etc/kafka-connect/jars

# Confluent control center to manage Kafka
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.0
# hostname: control-center
container_name: flink-control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9021/healthcheck"] # Adjust the URL and options as needed
interval: 30s
timeout: 10s
retries: 3
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
# CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
# CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
# CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
# PORT: 9021

# Offline store
timescaledb:
image: timescale/timescaledb:latest-pg13
command: postgres -c shared_preload_libraries=timescaledb
container_name: flink-timescaledb
ports:
- "5432:5432"
healthcheck:
test: ['CMD', 'psql', '-U', 'k6', '-c', 'SELECT 1']
interval: 10s
timeout: 5s
retries: 5
environment:
- PGDATA=/var/lib/postgresql/data/timescaledb
- POSTGRES_DB=k6
- POSTGRES_USER=k6
- POSTGRES_PASSWORD=k6
volumes:
- pgdata:/var/lib/postgresql/data

# Simulation of sending messages to Kafka topics
kafka_producer:
build:
context: data_ingestion
dockerfile: kafka_producer/Dockerfile
depends_on:
broker:
condition: service_healthy
timescaledb:
condition: service_healthy
container_name: flink-kafka-producer

volumes:
# Volume for TimescaleDB
pgdata:
Loading

0 comments on commit b9ef5a3

Please sign in to comment.