Skip to content

This project show how to use KSQL (Streaming SQL Engine for Apache Kafka) to stream processing.

License

Notifications You must be signed in to change notification settings

Waelson/Stream-Processing-with-KSQL

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

33 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Stream Processing using KSQL

This project show how to use KSQL (Streaming SQL Engine for Apache Kafka) to stream processing.

Enviroment

For you to use this repository you will need the following softwares:

However, only Docker and Docker Compose need is installed in your machine. All Kafka ecosystem will be embedded via docker images.

Steps

  1. Install Python and Pip
  2. Install Docker and Docker Compose
  3. Load Images
  4. Create Topics
  5. Start Simulator

1 - Install Python and Pip

The installation process of the Python and Pip is very easy. So this tutorial dont't will cover this steps. I recommend you look for more information in www.python.org and pip.pypa.io.

After you install Python and Pip run the command below to install all dependencies need to execute click_simulator.py application. This code is responsible to simulate the click events into an web page. It will generate unbounded click events, sending a flow continuous messages to a Kafka topic.

pip install -r requirements.txt

2 - Install Docker and Docker Compose

This tutorial does not demonstrate the installation process for Docker and Docker Compose. I strongly recommend you to visit the Docker installation link for more informations. Please click here.

3 - Loading Images

docker-compose up

or

docker-compose up -d

The last command allow you to run docker-compose in the background.

4 - Create Topics

docker-compose exec kafka kafka-topics --create --topic com.mywebsite.streams.pages --bootstrap-server localhost:9092
docker-compose exec kafka kafka-topics --create --topic com.mywebsite.streams.clickevents --bootstrap-server localhost:9092

5 - Start Simulator

python click_simulator.py

If you executed all steps correctly. You will see an image similar that below.

Starting application
Message: {"email": "anoble@yahoo.com", "timestamp": "1986-03-10T16:38:40", "uri": "https://mitchell.info/login.php", "number": 358}
Message: {"email": "leonardpatrick@mason-clark.info", "timestamp": "1971-04-25T10:09:26", "uri": "https://www.bailey.com/search/about/", "number": 431}
Message: {"email": "morriskatie@villarreal-villa.biz", "timestamp": "1996-11-22T00:12:20", "uri": "http://www.woodard.info/terms.php", "number": 838}
Message: {"email": "kenneth79@rogers.info", "timestamp": "2005-10-24T22:16:59", "uri": "http://www.king.com/wp-content/blog/blog/index/", "number": 793}
Message: {"email": "wbailey@wu-martinez.net", "timestamp": "1995-06-20T12:44:44", "uri": "https://www.smith-neal.com/categories/login/", "number": 509}
Message: {"email": "tkennedy@hall-wolfe.org", "timestamp": "2009-01-27T14:04:20", "uri": "https://www.marshall-holmes.info/", "number": 336}
Message: {"email": "steven15@yahoo.com", "timestamp": "2019-12-13T16:09:11", "uri": "https://www.sims.net/main.html", "number": 263}
Message: {"email": "hobbsmario@hotmail.com", "timestamp": "1990-08-16T05:09:04", "uri": "http://www.smith.com/search/tags/explore/about.jsp", "number": 61}
...

Connecting to KSQL Server

docker-compose exec ksql ksql http://localhost:8088

After you connect to KSQL Server you will see the image below:

                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2019 Confluent Inc.

CLI v5.4.1, Server v5.4.1 located at http://localhost:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Some Commands

Show all topics

ksql> SHOW TOPICS;

 Kafka Topic                       | Partitions | Partition Replicas
---------------------------------------------------------------------
 com.mywebsite.streams.clickevents | 5          | 1
 com.mywebsite.streams.pages       | 1          | 1
---------------------------------------------------------------------

Show all streams

ksql> SHOW STREAMS;

 Stream Name | Kafka Topic                     | Format
--------------------------------------------------------
 CLICKEVENTS | com.mywebsite.streams.clickevents | JSON
--------------------------------------------------------

Creating a Stream

If you need run it in the background mode.

CREATE STREAM clickevents
  (email VARCHAR,
  timestamp VARCHAR,
  uri VARCHAR,
  number INTEGER)
WITH (KAFKA_TOPIC='com.mywebsite.streams.clickevents',
  VALUE_FORMAT='JSON');

Creating a Table

CREATE TABLE pages
  (uri VARCHAR,
   description VARCHAR,
   created VARCHAR)
  WITH (KAFKA_TOPIC='com.mywebsite.streams.pages',
        VALUE_FORMAT='JSON',
        KEY='uri');

Creating a Table from a Query

CREATE TABLE a_pages AS
  SELECT * FROM pages WHERE uri LIKE 'http://www.a%';

Querying a Table or Stream

SELECT * FROM clickevents EMIT CHANGES;

Describing a Table and Stream

ksql> DESCRIBE PAGES;

Name                 : PAGES
 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 URI         | VARCHAR(STRING)
 DESCRIPTION | VARCHAR(STRING)
 CREATED     | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Managing Offsets

Like all Kafka Consumers, KSQL by default begins consumption at the latest offset. This can be a problem for some scenarios. In the following example we're going to create a pages table -- but -- we want all the data available to us in this table. In other words, we want KSQL to start from the earliest offset. To do this, we will use the SET command to set the configuration variabl auto.offset.reset for our session -- and before we run any commands.

SET 'auto.offset.reset' = 'earliest';

Also note that this can be set at the KSQL server level, if you'd like. Once you're done querying or creating tables or streams with this value, you can set it back to its original setting by simply running:

UNSET 'auto.offset.reset';

Scalar Functions

KSQL Provides a number of Scalar functions for us to make use of.

Lets write a function that takes advantage of some of these features:

SELECT UCASE(SUBSTRING(uri, 12))
  FROM clickevents
  WHERE number > 100
    AND uri LIKE 'http://www.k%' EMIT CHANGES;

Notice that as soon as you hit CTRL+C your query ends

Deleting a Table

As with Streams, we must first find the running underlying query, and then drop the table. First, find your query:

ksql> SHOW QUERIES;

 Query ID                | Kafka Topic      | Query String
----------------------------------------------------------------------------------------------
  CTAS_A_PAGES_1      | A_PAGES      | CREATE TABLE a_pages AS
    SELECT * FROM pages WHERE uri LIKE 'http://www.a%';
----------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

Find your query, which in this case is CTAS_A_PAGES_1 and then, finally, TERMINATE the query and DROP the table:

TERMINATE QUERY CTAS_A_PAGES_1;
DROP TABLE A_PAGES;

Windowing

Hopping and Tumbling Windows

In this demonstration we'll see how to create Tables with windowing enabled.

Tumbling Windows

Let's create a tumbling clickevents table, where the window size is 30 seconds.

CREATE STREAM clickevents_tumbling AS
  SELECT * FROM clickevents
  WINDOW TUMBLING (SIZE 30 SECONDS);

Hopping Windows

Now we can create a Table with a hopping window of 30 seconds with 5 second increments.

CREATE TABLE clickevents_hopping AS
  SELECT uri FROM clickevents
  WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 5 SECONDS)
  WHERE uri LIKE 'http://www.b%'
  GROUP BY uri;

The above window is 30 seconds long and advances by 5 second. If you query the table you will see the associated window times!

Session Windows

Finally, lets see how session windows work. We're going to define the session as 5 minutes in order to group many events to the same window

CREATE TABLE clickevents_session AS
  SELECT uri FROM clickevents
  WINDOW SESSION (5 MINUTES)
  WHERE uri LIKE 'http://www.b%'
  GROUP BY uri;

Kafka CLI Basic Commands

Creating a topic:

docker-compose exec kafka kafka-topics --create --topic <topic-name> --bootstrap-server localhost:9092

Writing a topic:

docker-compose exec kafka kafka-console-producer --topic <topic-name> --bootstrap-server localhost:9092

You must type on console the press key enter.

Reading a topic:

docker-compose exec kafka kafka-console-consumer.sh --topic <topic-name> --from-beginning --bootstrap-server localhost:9092

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

About

This project show how to use KSQL (Streaming SQL Engine for Apache Kafka) to stream processing.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages