DataStream.transform
This is a rather Quokka-specific API that allows arbitrary transformations on a DataStream, similar to Spark RDD.map.
Each batch in the DataStream is going to be transformed according to a user defined function, which can produce a new batch.
The new batch can have completely different schema or even length as the original batch, and the original data is considered lost,
or consumed by this transformation function. This could be used to implement user-defined-aggregation-functions (UDAFs). Note in
cases where you are simply generating a new column from other columns for each row, i.e. UDF, you probably want to use the
with_columns
method instead.
A DataStream is implemented as a stream of batches. In the runtime, your transformation function will be applied to each of those batches. However, there are no guarantees whatsoever on the sizes of these batches! You should probably make sure your logic is correct regardless of the sizes of the batches.
For example, if your DataStream consists of a column of numbers, and you wish to compute the sum of those numbers, you could first transform the DataStream to return just the sum of each batch, and then hook this DataStream up to a stateful operator that adds up all the sums.
You can use whatever libraries you have installed in your Python environment in this transformation function. If you are using this on a
cloud cluster, you have to make sure the necessary libraries are installed on each machine. You can use the utils
package in pyquokka to help
you do this, in particular, check out manager.install_python_package
.
Note a transformation in the logical plan basically precludes any predicate pushdown or early projection past it, since the original columns are assumed to be lost, and we cannot directly establish correspendences between the input columns to a transformation and its output columns for the purposes of predicate pushdown or early projection. The user is required to supply a set or list of required columns, and we will select for those columns (which can be pushed down) before we apply the transformation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
f |
function
|
The transformation function. This transformation function must take as input a Polars DataFrame and output a Polars DataFrame. The transformation function must not have expectations on the length of its input. Similarly, the transformation function does not have to emit outputs of a specific size. The transformation function must produce the same output columns for every possible input. |
required |
new_schema |
list
|
The names of the columns of the Polars DataFrame that the transformation function produces. |
required |
required_columns |
set
|
The names of the columns that are required for this transformation. This argument is made mandatory because it's often trivial to supply and can often greatly speed things up. |
required |
foldable |
bool
|
Whether or not the transformation can be executed as part of the batch post-processing of the previous operation in the execution graph. This is set to True by default. Correctly setting this flag requires some insight into how Quokka works. Lightweight functions generally benefit from being folded. Heavyweight functions or those whose efficiency improve with large input sizes might benefit from not being folded. |
True
|
Return
A new transformed DataStream with the supplied schema.
Examples:
Let's define a user defined function that takes in a Polars DataFrame with a single column "text", converts it to a Pyarrow table, and uses nice Pyarrow compute functions to perform the word count on this Polars DataFrame. Note 1) we have to convert it back to a Polars DataFrame afterwards, 2) the function works regardless of input length and 3) the output columns are the same regardless of the input.
>>> def udf2(x):
>>> x = x.to_arrow()
>>> da = compute.list_flatten(compute.ascii_split_whitespace(x["text"]))
>>> c = da.value_counts().flatten()
>>> return polars.from_arrow(pa.Table.from_arrays([c[0], c[1]], names=["word","count"]))
This is a trick to read in text files, just use read_csv with a separator you know won't appear -- the result will just be DataStream with one column.
>>> words = qc.read_csv("random_words.txt", ["text"], sep = "|")
Now transform words to counts. The result will be a DataStream with two columns, "word" and "count".
>>> counted = words.transform( udf2, new_schema = ["word", "count"], required_columns = {"text"}, foldable=True)
Source code in pyquokka/datastream.py
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 |
|