Skip to main content

Tumbling Window

SDF has a tumbling window that collects entries from the source for a fixed duration of the window before finally being sent to the sink topic. For more in-depth explanation about windows, read this.

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

States

To read more about states, the following page shows information about states.

Define a state

The following is a primitive state taking using key-value pair.

    states:
count-by-first-letter:
type: keyed-state
properties:
key:
type: string
value:
type: u32

Define a window

We will define a window with the following syntax

    window: 
tumbling:
(...)
assign-timestamp:
(...)
partition:
assign-key:
(...)
update-state:
(...)
flush:
(...)

Five components

1. Add Window duration

tumbling: 
duration: 15s

The following sets the duration of the window to discrete 15 second blocks.

2. Assigning a timestamp

assign-timestamp:
run: |
fn assign_event_timestamp(_str: String, time: i64) -> Result<i64> {
Ok(time)
}

The above code assigns a timestamp to each of the entries from the source. In other cases, if a timestamp is encoded in the data, the code could also use the encoded timestamp.

3. Assigning a partition key

assign-key:
run: |
fn key_first_character(str: String) -> Result<String> {
if str.len() == 0 {
return Ok("empty".to_string());
}
Ok(str[0..1].to_string())
}

We need to make sure we have a mapping function that takes the string and maps it to a key. In this case, we are writing a dataflow that counts the number of occurance the first character appears in the window. Thus, the code just extracts the first character as a key.

4. Update the state

update-state:
run: |
fn increment_color_count(str: String) -> Result<()> {
// Note: The instead of dashes, the function uses underscores instead.
count_by_first_letter().increment(1);
Ok(())
}

To update the state, we have to call the object created by the state listed above. The update-state will automatically apply the state instance of the parameter. For our case, we call the increment function. Each time a string appears, the function will update the mapped key's value.

5. Flush the window

The final step is to flush the window. All the contents of the window gets popped and outputted into source.

flush:
run: |
fn get_first_character_count() -> Result<String> {
let cc = count_by_first_letter().clone();
Ok(cc.into_iter().map(|(letter, count)|
format!("Sentences with {} occured {} times,",letter,count)
).collect())
}

In the example above, we copy the state object and iterate through the keys. A output(in this case a string) is created every time the window flushes.

Running the Example

Copy and paste following config and save it as dataflow.yaml. The first window-collect-service is defined from the outlined five parts above. The second service is reference in splitting the collected data.

# dataflow.yaml
apiVersion: 0.5.0
meta:
name: flat-map-example
version: 0.1.0
namespace: examples

config:
converter: raw

topics:
sentences:
schema:
value:
type: string
sentence-start-col:
schema:
value:
type: string
sentence-start:
schema:
value:
type: string

services:
window-collect-service:
sources:
- type: topic
id: sentences
states:
count-by-first-letter:
type: keyed-state
properties:
key:
type: string
value:
type: u32
window:
tumbling:
duration: 15s
assign-timestamp:
run: |
fn assign_event_timestamp(_str: String, time: i64) -> Result<i64> {
Ok(time)
}
partition:
assign-key:
run: |
fn key_first_character(str: String) -> Result<String> {
if str.len() == 0 {
return Ok("empty".to_string());
}
Ok(str[0..1].to_string())
}
update-state:
run: |
fn increment_color_count(str: String) -> Result<()> {
count_by_first_letter().increment(1);
Ok(())
}
flush:
run: |
fn get_first_character_count() -> Result<String> {
let cc = count_by_first_letter().clone();
Ok(cc.into_iter().map(|(letter, count)|
format!("Sentences with {} occured {} times,",letter,count)
).collect())
}
sinks:
- type: topic
id: sentence-start-col

split-service:
sources:
- type: topic
id: sentence-start-col
transforms:
- operator: flat-map
run: |
fn split(input: String) -> Result<Vec<String>> {
let result: Vec<String> = input.split(',')
.map(|s| s.to_string())
.collect();
Ok(result)
}
sinks:
- type: topic
id: sentence-start

To run example:

$ sdf run --ephemeral

Produce random sentences with the following bash script.

while true; do
random=$(openssl rand -base64 12)
echo "Random String: $random"
echo "$random" | fluvio produce sentences
sleep 1
done

Consume topic sink topic for the sentence-start-col in another topic.

$ fluvio consume sentence-start-col -Bd

The output of the string is random, but should look something like this.

Sentences with e occured 1 times,...

The output is a single entry in the sink topic.

Splitting the collected data

The above example collects all the data into one object. This could be sufficient for formats like JSON where information is together in one object. However, breaking up the aggregated data into a topic is also possible with a simple flat-map as seen with the example service split-service.

transforms:
- operator: flat-map
run: |
fn split(input: String) -> Result<Vec<String>> {
let result: Vec<String> = input.split(',')
.map(|s| s.to_string())
.collect();
Ok(result)
}

Consuming the topic sentence-start will result in the individual entries neatly singled out in said topic.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

We just how to implement tumbling windows.