Skip to content

DataStream.transform

This is a rather Quokka-specific API that allows arbitrary transformations on a DataStream, similar to Spark RDD.map. Each batch in the DataStream is going to be transformed according to a user defined function, which can produce a new batch. The new batch can have completely different schema or even length as the original batch, and the original data is considered lost, or consumed by this transformation function. This could be used to implement user-defined-aggregation-functions (UDAFs). Note in cases where you are simply generating a new column from other columns for each row, i.e. UDF, you probably want to use the with_columns method instead.

A DataStream is implemented as a stream of batches. In the runtime, your transformation function will be applied to each of those batches. However, there are no guarantees whatsoever on the sizes of these batches! You should probably make sure your logic is correct regardless of the sizes of the batches.

For example, if your DataStream consists of a column of numbers, and you wish to compute the sum of those numbers, you could first transform the DataStream to return just the sum of each batch, and then hook this DataStream up to a stateful operator that adds up all the sums.

You can use whatever libraries you have installed in your Python environment in this transformation function. If you are using this on a cloud cluster, you have to make sure the necessary libraries are installed on each machine. You can use the utils package in pyquokka to help you do this, in particular, check out manager.install_python_package.

Note a transformation in the logical plan basically precludes any predicate pushdown or early projection past it, since the original columns are assumed to be lost, and we cannot directly establish correspendences between the input columns to a transformation and its output columns for the purposes of predicate pushdown or early projection. The user is required to supply a set or list of required columns, and we will select for those columns (which can be pushed down) before we apply the transformation.

Parameters:

Name Type Description Default
f function

The transformation function. This transformation function must take as input a Polars DataFrame and output a Polars DataFrame. The transformation function must not have expectations on the length of its input. Similarly, the transformation function does not have to emit outputs of a specific size. The transformation function must produce the same output columns for every possible input.

required
new_schema list

The names of the columns of the Polars DataFrame that the transformation function produces.

required
required_columns set

The names of the columns that are required for this transformation. This argument is made mandatory because it's often trivial to supply and can often greatly speed things up.

required
foldable bool

Whether or not the transformation can be executed as part of the batch post-processing of the previous operation in the execution graph. This is set to True by default. Correctly setting this flag requires some insight into how Quokka works. Lightweight functions generally benefit from being folded. Heavyweight functions or those whose efficiency improve with large input sizes might benefit from not being folded.

True
Return

A new transformed DataStream with the supplied schema.

Examples:

Let's define a user defined function that takes in a Polars DataFrame with a single column "text", converts it to a Pyarrow table, and uses nice Pyarrow compute functions to perform the word count on this Polars DataFrame. Note 1) we have to convert it back to a Polars DataFrame afterwards, 2) the function works regardless of input length and 3) the output columns are the same regardless of the input.

>>> def udf2(x):
>>>    x = x.to_arrow()
>>>    da = compute.list_flatten(compute.ascii_split_whitespace(x["text"]))
>>>    c = da.value_counts().flatten()
>>>    return polars.from_arrow(pa.Table.from_arrays([c[0], c[1]], names=["word","count"]))

This is a trick to read in text files, just use read_csv with a separator you know won't appear -- the result will just be DataStream with one column.

>>> words = qc.read_csv("random_words.txt", ["text"], sep = "|")

Now transform words to counts. The result will be a DataStream with two columns, "word" and "count".

>>> counted = words.transform( udf2, new_schema = ["word", "count"], required_columns = {"text"}, foldable=True)
Source code in pyquokka/datastream.py
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
def transform(self, f, new_schema: list, required_columns: set, foldable=True):

    """
    This is a rather Quokka-specific API that allows arbitrary transformations on a DataStream, similar to Spark RDD.map.
    Each batch in the DataStream is going to be transformed according to a user defined function, which can produce a new batch.
    The new batch can have completely different schema or even length as the original batch, and the original data is considered lost,
    or consumed by this transformation function. This could be used to implement user-defined-aggregation-functions (UDAFs). Note in
    cases where you are simply generating a new column from other columns for each row, i.e. UDF, you probably want to use the 
    `with_columns` method instead. 

    A DataStream is implemented as a stream of batches. In the runtime, your transformation function will be applied to each of those batches.
    However, there are no guarantees whatsoever on the sizes of these batches! You should probably make sure your logic is correct
    regardless of the sizes of the batches. 

    For example, if your DataStream consists of a column of numbers, and you wish to compute the sum
    of those numbers, you could first transform the DataStream to return just the sum of each batch, and then hook this DataStream up to 
    a stateful operator that adds up all the sums. 

    You can use whatever libraries you have installed in your Python environment in this transformation function. If you are using this on a
    cloud cluster, you have to make sure the necessary libraries are installed on each machine. You can use the `utils` package in pyquokka to help
    you do this, in particular, check out `manager.install_python_package`.

    Note a transformation in the logical plan basically precludes any predicate pushdown or early projection past it, since the original columns 
    are assumed to be lost, and we cannot directly establish correspendences between the input columns to a transformation and its output 
    columns for the purposes of predicate pushdown or early projection. The user is required to supply a set or list of required columns,
    and we will select for those columns (which can be pushed down) before we apply the transformation. 

    Args:
        f (function): The transformation function. This transformation function must take as input a Polars DataFrame and output a Polars DataFrame. 
            The transformation function must not have expectations on the length of its input. Similarly, the transformation function does not 
            have to emit outputs of a specific size. The transformation function must produce the same output columns for every possible input.
        new_schema (list): The names of the columns of the Polars DataFrame that the transformation function produces. 
        required_columns (set): The names of the columns that are required for this transformation. This argument is made mandatory
            because it's often trivial to supply and can often greatly speed things up.
        foldable (bool): Whether or not the transformation can be executed as part of the batch post-processing of the previous operation in the 
            execution graph. This is set to True by default. Correctly setting this flag requires some insight into how Quokka works. Lightweight
            functions generally benefit from being folded. Heavyweight functions or those whose efficiency improve with large input sizes 
            might benefit from not being folded. 

    Return:
        A new transformed DataStream with the supplied schema.

    Examples:

        Let's define a user defined function that takes in a Polars DataFrame with a single column "text", converts it to a Pyarrow table,
        and uses nice Pyarrow compute functions to perform the word count on this Polars DataFrame. Note 1) we have to convert it 
        back to a Polars DataFrame afterwards, 2) the function works regardless of input length and 3) the output columns are the 
        same regardless of the input.

        >>> def udf2(x):
        >>>    x = x.to_arrow()
        >>>    da = compute.list_flatten(compute.ascii_split_whitespace(x["text"]))
        >>>    c = da.value_counts().flatten()
        >>>    return polars.from_arrow(pa.Table.from_arrays([c[0], c[1]], names=["word","count"]))

        This is a trick to read in text files, just use read_csv with a separator you know won't appear -- the result will just be DataStream with one column. 

        >>> words = qc.read_csv("random_words.txt", ["text"], sep = "|")

        Now transform words to counts. The result will be a DataStream with two columns, "word" and "count". 

        >>> counted = words.transform( udf2, new_schema = ["word", "count"], required_columns = {"text"}, foldable=True)

    """
    if type(required_columns) == list:
        required_columns = set(required_columns)
    assert type(required_columns) == set

    if self.materialized:
        df = self._get_materialized_df()
        df = f(df)
        return self.quokka_context.from_polars(df)

    select_stream = self.select(required_columns)

    return self.quokka_context.new_stream(
        sources={0: select_stream},
        partitioners={0: PassThroughPartitioner()},
        node=MapNode(
            schema=new_schema,
            schema_mapping={col: {-1: col} for col in new_schema},
            required_columns={0: required_columns},
            function=f,
            foldable=foldable
        ),
        schema=new_schema,

    )