Skip to main content

State - Arrow Dataframe

States are a key concept in SDF. They hold a simple value or they can be divided into multiple partitions based on a key. Each partition state can have many different types.

One of the type is arrow-row. For example, follow snippet define state count_per_word that store each row into arrow dataframe.

    states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
count:
type: u32

In here, we are defining a state count_per_word to track frequency of the words. For each key, we have a row that has a column count that store the count of the word. For example, let's we have following word: apple, orange, banana, orange, grape, orange, banana.

Then this will be mapped to arrow dataframe as follows:

_keycount
apple2
orange3
banana2
grape1

To update the state, you can use the update-state operator as below:

update-state:
run: |
fn count_word(_word: String) -> Result<()> {
let mut state = count_per_word();
state.count += 1;
state.update()?;
Ok(())
}

Note that state value can be access using count_per_word state function which is automatically injected by SDF builder.

This API is invoked by the update-state operator, which only returns the value of the partition state.

In the example, count_per_word represents a row value of the dataframe. If operator sees apple, it will be first row in the dataframe above.

However, aggregate operators like flush can access the entire state and perform aggregation across all partitions. In this case, the count_per_word state function returns the entire DataFrame, not just individual rows. You can then perform DataFrame operations using the SQL API. The snippet below shows how to use SQL to get the 3 most frequent words.

flush:
run: |
fn aggregate_wordcount() -> Result<TopWords> {
let word_counts = count_per_word();

let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;
let rows = top3.rows()?;

let mut top_words = vec![];
let key = top3.key()?;
let count = top3.col("count")?;

while rows.next() {
let word = rows.str(&key)?;
let count = rows.u32(&count)?;
let word_count = WordCount { word, count };
top_words.push(word_count);
}

Ok(top_words)
}

SQL API

For any state that is dataframe, you can use SQL API to perform dataframe operation. SDF uses polar SQL to perform dataframe operation.
The result of the SQL operation is always dataframe. So you can perform multiple SQL operation to get the desired result.

The SQL is executed in the context of the dataframe. And name of the dataframe is state as illustrated below:

let top3 = word_counts.sql("select * from count_per_word order by count desc limit 3")?;

Row API

Once you have dataframe, you can use row api to access values of individual rows. To get rows, you can use rows function on the dataframe.

let rows = top3.rows()?;

You can think of row as iterator that you can use to access individual row. Following functions are available on row object. To access row values, must first get columns like below:

let count = top3.col("count")?;

To get key, can use helper function key:

let key = top.key()?;

Use next function to goto next row. If successful, it will return true. If there are no more rows, it will return false.

while rows.next() {
...
}

As long as next is true, you can access current value by passing column. Each column has type associated with it. So you can only acess values that are compatible with the column type otherwise it will return error.

For example, to get string value, you can use str function. To get u32 value, you can use u32 function.

let word = rows.str(&key)?;
let count = rows.u32(&count)?;