The main goal of this project is to show how KSQL (https://www.confluent.io/product/ksql/) works. KSQL is the streaming SQL engine that enables real-time data processing along with Apache Kafka.
Bellow you will find some guidance to setup the project to work locally.
To startup all the servers I suggest you to use Confluent Platform (I used version 5.2.1).
You can download it in this link: https://www.confluent.io/download/. After extract, add it to the PATH variable.
To start all the 4 servers (Zookeeper, Kafka, Schema Registry and KSQL Server) type the command:
$ confluent start ksql-server
To be able to get tweets from Twitter you need to set two environment variables twitter.consumerKey and twitter.consumerSecret.
You will get these variables in https://developer.twitter.com/ creating a new app.
To be able to get the song from Spotify you need three variables, spotify.refreshToken, spotify.clientId and spotify.clientSecret.
You will get the spotify.clientId and spotify.clientSecret in https://developer.spotify.com/ at the Dashboard page.
Tip: the Spotify form will ask you for a "redirect URI", you can add something like: http://localhost/callback. Do not matter if you have this application up and running, for this purpose will be ok.
To generate the spotify.refreshToken Spotify use OAuth Authorization Code Flow (https://developer.spotify.com/documentation/general/guides/authorization-guide/).
As this is a demo application, you can use Postman (https://www.getpostman.com/) to get the refresh token.
- Getting the code
Open Google Chrome and type the URL bellow adding your client_id and the redirect_uri that you used when you created the app:
Now Chrome will show an URL like: [http://localhost/callback?code=<the_code_is_shown_here>]
Copy the code and save it, you'll need it to generate the refreshToken.
- Getting the refresh token
Now using Postman to create a POST request for https://accounts.spotify.com/api/token
Set the Content-Type header to application/x-www-form-urlencoded
Add a basic auth with your clientId and clientSecret
At body you'll add three parameters:
- grant_type with the value authorization_code
- code with the value you got earlier with Chrome
- redirect_uri with the value you added to create the app
Make the request. The response JSON will have the refresh_token parameter.
You can generate guesses directly to a topic using the datagen/guesses.avro file. The command is:
ksql-datagen schema=./datagen/guesses.avro format=json topic=guesses key=user maxInterval=5000 iterations=10000
At KSQL you have to create streams and tables to be able to see the winners.
Current Song:
CREATE STREAM SONG_WRAPPER (NAME VARCHAR, ARTIST VARCHAR) WITH (KAFKA_TOPIC='current_song', VALUE_FORMAT='JSON');
CREATE STREAM SONG_STAGE_1 with (partitions=1) AS SELECT 'CURRENT_SONG' AS CURRENT_SONG, UCASE(NAME) AS SONG_KEY, NAME, ARTIST FROM SONG_WRAPPER PARTITION BY CURRENT_SONG;
CREATE TABLE SONG (CURRENT_SONG VARCHAR, SONG_KEY VARCHAR, NAME VARCHAR, ARTIST VARCHAR) WITH (KAFKA_TOPIC='SONG_STAGE_1', VALUE_FORMAT='JSON', KEY='CURRENT_SONG');
Winner:
CREATE STREAM GUESSES_WRAPPER (SONG VARCHAR, USER VARCHAR) WITH (KAFKA_TOPIC='guesses', VALUE_FORMAT='JSON');
CREATE STREAM GUESSES_STAGE_1 with (partitions=1) AS SELECT UCASE(SONG) AS SONG_KEY, SONG, USER FROM GUESSES_WRAPPER;
CREATE STREAM GUESSES_STAGE_2 with (partitions=1) AS SELECT SONG_KEY, SONG, USER, 'CURRENT_SONG' AS CURRENT_SONG FROM GUESSES_STAGE_1;
CREATE STREAM GUESSES_STAGE_3 with (partitions=1) AS SELECT ROWTIME AS TIMESTAMP, SONG_KEY, SONG, USER, 'CURRENT_SONG' AS CURRENT_SONG FROM GUESSES_STAGE_2 PARTITION BY TIMESTAMP;
CREATE STREAM GUESSES_STAGE_4 with (partitions=1) as SELECT TIMESTAMP, USER FROM GUESSES_STAGE_3 G LEFT JOIN SONG S ON G.CURRENT_SONG = S.CURRENT_SONG WHERE G.SONG_KEY = S.SONG_KEY;
CREATE STREAM WINNERS with (partitions=1) AS SELECT TIMESTAMP, USER FROM GUESSES_STAGE_4 PARTITION BY TIMESTAMP;