Join us on the demo, while our product experts provide a detailed walkthrough of our enterprise platform.

logoAsync

The latest news in the world of engineering.

Identity and Access Management (IAM), including security and customer experience.

Visit Blog
Fuel

Grow your business to millions.Engage and retain your customers.

Visit Blog

Kafka Streams: A stream processing guide

Learn about Kafka Streams, key concepts and highlights with simple streaming or a word count application using Kafka Streams in Scala

Priyadarshan Mohanty
Priyadarshan Mohanty
July 01, 2020
4 min read

Free, Secure and Trusted Way to Authenticate Your Visitors

Add login to your website in 5 minutes completely for free!

Free Sign UpNo hidden costs. No credit card needed.

Kafka Streams is a Java library developed to help applications that do stream processing built on Kafka. To learn about Kafka Streams, you need to have a basic idea about Kafka to understand better. If you’ve worked with Kafka before, Kafka Streams is going to be easy to understand.

What are Kafka Streams?

Kafka Streams is a streaming application building library, specifically applications that turn Kafka input topics into Kafka output topics. Kafka Streams enables you to do this in a way that is distributed and fault-tolerant, with succinct code.

What is Stream processing?

Stream processing is the ongoing, concurrent, and record-by-record real-time processing of data.

Let us get started with some highlights of Kafka Streams:

  • Low Barrier to Entry: Quickly write and run a small-scale POC on a single instance. You only need to run multiple instances of the application on various machines to scale up to high-volume production workloads.
  • Lightweight and straightforward client library: Can be easily embedded in any Java application and integrated with any existing packaging, deployment, and operational tools.
  • No external dependencies on systems other than Apache Kafka itself
  • Fault-tolerant local state: Enables fast and efficient stateful operations like windowed joins and aggregations
  • Supports exactly-once processing: Each record will be processed once and only once, even when there is a failure.
  • One-record-at-a-time processing to achieve millisecond processing latency supports event-time-based windowing operations with out-of-order arrival of records.

Kafka Streams Concepts:

  • Stream: An ordered, replayable, and fault-tolerant sequence of immutable data records, where each data record is defined as a key-value pair.
  • Stream Processor: A node in the processor topology represents a processing step to transform data in streams by receiving one input record at a time from its source in the topology, applying any operation to it, and may subsequently produce one or more output records to its sinks.

There are two individual processors in the topology:

  • Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.
  • Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.

Toplogy Example

  • Kstream: KStream is nothing but that, a Kafka Stream. It’s a never-ending flow of data in a stream. Each piece of data — a record or a fact — is a collection of key-value pairs. Data records in a record stream are always interpreted as an "INSERT".
  • KTable: A KTable is just an abstraction of the stream, where only the latest value is kept. Data records in a record stream are always interpreted as an "UPDATE".

There is actually a close relationship between streams and tables, the so-called stream-table duality

Implementing Kafka Streams

Let's Start with the Setup using Scala instead of Java. The Kafka Streams DSL for Scala library is a wrapper over the existing Java APIs for Kafka Streams DSL.

To Setup things, we need to create a KafkaStreams Instance. It needs a topology and configuration (java.util.Properties). We also need a input topic and output topic. Let's look through a simple example of sending data from an input topic to an output topic using the Streams API

You can create a topic using the below commands (need to have Kafka pre installed)

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic inputTopic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic outputTopic
val config: Properties = {
    val properties = new Properties()
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "your-application")
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String())
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String())
    properties
    }

StreamsBuilder provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.

val builder: StreamsBuilder = new StreamsBuilder

Creates a KStream from the specified topics.

val inputStream: KStream[String,String] = builder.stream(inputTopic, Consumed.`with`(Serdes.String(), Serdes.String()))

Store the input stream to the output topic.

inputStream.to(outputTopic)(producedFromSerde(Serdes.String(),Serdes.String())

Starts the Streams Application

val kEventStream = new KafkaStreams(builder.build(), config)
kEventStream.start()
sys.ShutdownHookThread {
      kEventStream.close(10, TimeUnit.SECONDS)
    }

You can send data to the input topic using

kafka-console-producer --broker-list localhost:9092 --topic inputTopic

And can fetch the data from the output topic using

kafka-console-consumer --bootstrap-server localhost:9092 --topic outputTopic --from-beginning

You can add the necessary dependencies in your build file for sbt or pom file for maven. Below is an example for build.sbt.

// Kafka
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts( Artifact("javax.ws.rs-api", "jar", "jar")) // this is a workaround. There is an upstream dependency that causes trouble in SBT builds.

Let us modify the code a little bit to try out a WordCount example: The code splits the sentences into words and groups by word as a key and the number of occurences or count as value and is being sent to the output topic by converting the KTable to KStream.

val textLines: KStream[String, String] = builder.stream[String, String](inputTopic)
val wordCounts: KTable[String, Long] = textLines
		.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
		.groupBy((_, word) => word)
		.count()(materializedFromSerde(Serdes.String(),Serdes.Long()))
	wordCounts.toStream.to(outputTopic)(producedFromSerde(Serdes.String(),Serdes.Long())

With the above process, we can now implement a simple streaming application or a word count application using Kafka Streams in Scala. If you want to set up kafka on Windows, here is a quick guide to implement apache kafka on windows OS.



LoginRadius Docs

Implement Authentication in Minutes

click here

Most Popular Tags

EngineeringJavaScriptReactAuthenticationNodeJsCSSSecurityC#JWTOAuth

Do you want a free authentication solution?

Add the world's most secure, reliable and easy to implement user authentication solution on your applications at $0Get Started Free

Priyadarshan Mohanty

Priyadarshan Mohanty

Priyadarshan Mohanty is a Data Engineer at LoginRadius and has experience in working in Big Data technologies like Hadoop, Hive, Spark and Kafka. Interested in database technologies and distributed systems.

View Profile

Try a Modern Authentication Solution

$0/ month

Free Sign Up
  • 7,000 Monthly Active Users
  • 1 Web or Mobile App
  • Email/Password Login
  • 3 Social Login Providers
  • User Data Storage
  • Transactional Email Template
  • Customizable Login Interfaces