Filtering
Learn how to use the filter operator in Kafka Streams topology.
We'll cover the following
Now that we have our incoming Track
objects properly deserialized, we’ll move on to fulfill the second requirement: tracks that have been listened to for less than 30 seconds should be disregarded.
To achieve that, we will use the filter
operator. Using the filter
operator on a stream creates a new stream that consists only of records satisfying a condition provided to the filter. Records that do not meet the condition are dropped and are not further processed.
The filter
operator
This condition is better known as a predicate, which is simply a boolean expression that accepts the record’s key and value for arguments. If the predicate returns true
, then the record is kept. If the predicate returns false
, then the record is dropped. This is not different from the Java 8 Stream API filter operator, different only in that the predicate to Kafka Streams’ filter accepts two arguments instead of one.
Our predicate should be simple—it should return true
if the value of the secondsListened
is more than 30
. We should add the filter right after the peek in a fluent style in line 19. After adding the filter, the stream should look like this:
Get hands-on with 1200+ tech skills courses.