Skip to content

DataStream.collect

DataStream.collect

This will trigger the execution of computational graph, similar to Spark collect(). The result will be a Polars DataFrame returned to the client. Like Spark, this will be slow or cause OOM if the result is very large!

If you want to compute a temporary result that will be used in a future computation, try to use the compute() method instead.

Return

Polars DataFrame.

Examples:

Result will be a Polars DataFrame, as if you did polars.read_csv("my_csv.csv")

>>> f = qc.read_csv("my_csv.csv")
>>> result = f.collect()
Source code in pyquokka/datastream.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def collect(self):
    """
    This will trigger the execution of computational graph, similar to Spark collect(). 
    The result will be a Polars DataFrame returned to the client. 
    Like Spark, this will be slow or cause OOM if the result is very large!

    If you want to compute a temporary result that will be used in a future computation, try to use 
    the `compute()` method instead.

    Return:
        Polars DataFrame. 

    Examples:
        Result will be a Polars DataFrame, as if you did polars.read_csv("my_csv.csv")

        >>> f = qc.read_csv("my_csv.csv")
        >>> result = f.collect()  
    """
    if self.materialized:
        return self._get_materialized_df()

    dataset = self.quokka_context.new_dataset(self, self.schema)
    result = self.quokka_context.execute_node(dataset.source_node_id)
    return result