Tumbling Window
SDF has a tumbling window that collects entries from the source for a fixed duration of the window before finally being sent to the sink topic. For more in-depth explanation about windows, read this.
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
States
To read more about states, the following page shows information about states.
Define a state
The following is a primitive state taking using key-value pair.
states:
count-by-first-letter:
type: keyed-state
properties:
key:
type: string
value:
type: u32
Define a window
We will define a window with the following syntax
window:
tumbling:
(...)
assign-timestamp:
(...)
partition:
assign-key:
(...)
update-state:
(...)
flush:
(...)
Five components
1. Add Window duration
tumbling:
duration: 15s
The following sets the duration of the window to discrete 15 second blocks.
2. Assigning a timestamp
assign-timestamp:
run: |
fn assign_event_timestamp(_str: String, time: i64) -> Result<i64> {
Ok(time)
}
The above code assigns a timestamp to each of the entries from the source. In other cases, if a timestamp is encoded in the data, the code could also use the encoded timestamp.
3. Assigning a partition key
assign-key:
run: |
fn key_first_character(str: String) -> Result<String> {
if str.len() == 0 {
return Ok("empty".to_string());
}
Ok(str[0..1].to_string())
}
We need to make sure we have a mapping function that takes the string and maps it to a key. In this case, we are writing a dataflow that counts the number of occurance the first character appears in the window. Thus, the code just extracts the first character as a key.
4. Update the state
update-state:
run: |
fn increment_color_count(str: String) -> Result<()> {
// Note: The instead of dashes, the function uses underscores instead.
count_by_first_letter().increment(1);
Ok(())
}
To update the state, we have to call the object created by the state listed above. The update-state
will automatically apply the state instance of the parameter. For our case, we call the increment function. Each time a string appears, the function will update the mapped key's value.
5. Flush the window
The final step is to flush the window. All the contents of the window gets popped and outputted into source.
flush:
run: |
fn get_first_character_count() -> Result<String> {
let cc = count_by_first_letter().clone();
Ok(cc.into_iter().map(|(letter, count)|
format!("Sentences with {} occured {} times,",letter,count)
).collect())
}
In the example above, we copy the state object and iterate through the keys. A output(in this case a string) is created every time the window flushes.
Running the Example
Copy and paste following config and save it as dataflow.yaml
. The first window-collect-service
is defined from the outlined five parts above. The second service is reference in splitting the collected data.
# dataflow.yaml
apiVersion: 0.5.0
meta:
name: flat-map-example
version: 0.1.0
namespace: examples
config:
converter: raw
topics:
sentences:
schema:
value:
type: string
sentence-start-col:
schema:
value:
type: string
sentence-start:
schema:
value:
type: string
services:
window-collect-service:
sources:
- type: topic
id: sentences
states:
count-by-first-letter:
type: keyed-state
properties:
key:
type: string
value:
type: u32
window:
tumbling:
duration: 15s
assign-timestamp:
run: |
fn assign_event_timestamp(_str: String, time: i64) -> Result<i64> {
Ok(time)
}
partition:
assign-key:
run: |
fn key_first_character(str: String) -> Result<String> {
if str.len() == 0 {
return Ok("empty".to_string());
}
Ok(str[0..1].to_string())
}
update-state:
run: |
fn increment_color_count(str: String) -> Result<()> {
count_by_first_letter().increment(1);
Ok(())
}
flush:
run: |
fn get_first_character_count() -> Result<String> {
let cc = count_by_first_letter().clone();
Ok(cc.into_iter().map(|(letter, count)|
format!("Sentences with {} occured {} times,",letter,count)
).collect())
}
sinks:
- type: topic
id: sentence-start-col
split-service:
sources:
- type: topic
id: sentence-start-col
transforms:
- operator: flat-map
run: |
fn split(input: String) -> Result<Vec<String>> {
let result: Vec<String> = input.split(',')
.map(|s| s.to_string())
.collect();
Ok(result)
}
sinks:
- type: topic
id: sentence-start
To run example:
$ sdf run --ephemeral
Produce random sentences with the following bash script.
while true; do
random=$(openssl rand -base64 12)
echo "Random String: $random"
echo "$random" | fluvio produce sentences
sleep 1
done
Consume topic sink topic for the sentence-start-col
in another topic.
$ fluvio consume sentence-start-col -Bd
The output of the string is random, but should look something like this.
Sentences with e occured 1 times,...
The output is a single entry in the sink topic.
Splitting the collected data
The above example collects all the data into one object. This could be sufficient for formats like JSON where information is together in one object. However, breaking up the aggregated data into a topic is also possible with a simple flat-map as seen with the example service split-service
.
transforms:
- operator: flat-map
run: |
fn split(input: String) -> Result<Vec<String>> {
let result: Vec<String> = input.split(',')
.map(|s| s.to_string())
.collect();
Ok(result)
}
Consuming the topic sentence-start
will result in the individual entries neatly singled out in said topic.
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
We just how to implement tumbling windows.