Skip to content

DataStream.agg

Aggregate this DataStream according to the defined aggregations without any pre-grouping. This is similar to Pandas df.agg(). The result will be one row.

The result is a DataStream that will return a batch when the entire aggregation is done, since it's impossible to return any aggregation results without seeing the entire dataset. As a result, you should call .compute() or .collect() on this DataStream instead of doing additional operations on it like .filter() since those won't be pipelined anyways. The only reason Quokka by default returns a DataStream instead of just returning a Polars DataFrame or a Quokka DataSet is so you can do .explain() on it.

Parameters:

Name Type Description Default
aggregations dict

similar to a dictionary argument to Pandas df.agg(). The key is the column name, where the value is a str that is "min", "max", "mean", "sum", "avg" or a list of such strings. If you desire to have the count column in your result, add a key "*" with value "count". Look at the examples.

required
Return

A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done.

Examples:

>>> lineitem = qc.read_csv("lineitem.csv")
>>> d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
>>> d = d.with_column("disc_price", lambda x:x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})

I want the sum and average of the l_quantity column and the l_extendedprice column, the sum of the disc_price column, the minimum of the l_discount column, and oh give me the total row count as well.

>>> f = d.agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "l_discount":"min","*":"count"})
Source code in pyquokka/datastream.py
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
def agg(self, aggregations):

    """
    Aggregate this DataStream according to the defined aggregations without any pre-grouping. This is similar to Pandas `df.agg()`.
    The result will be one row.

    The result is a DataStream that will return a batch when the entire aggregation is done, since it's impossible to return any aggregation
    results without seeing the entire dataset. As a result, you should call `.compute()` or `.collect()` on this DataStream instead of doing 
    additional operations on it like `.filter()` since those won't be pipelined anyways. The only reason Quokka by default returns a DataStream
    instead of just returning a Polars DataFrame or a Quokka DataSet is so you can do `.explain()` on it.

    Args:
        aggregations (dict): similar to a dictionary argument to Pandas `df.agg()`. The key is the column name, where the value
            is a str that is "min", "max", "mean", "sum", "avg" or a list of such strings. If you desire to have the count column
            in your result, add a key "*" with value "count". Look at the examples.

    Return:
        A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done. 

    Examples:

        >>> lineitem = qc.read_csv("lineitem.csv")

        >>> d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")

        >>> d = d.with_column("disc_price", lambda x:x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})

        I want the sum and average of the l_quantity column and the l_extendedprice column, the sum of the disc_price column, the minimum of the l_discount
        column, and oh give me the total row count as well.

        >>> f = d.agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "l_discount":"min","*":"count"})

    """

    return self._grouped_aggregate([], aggregations, None)