Skip to content

GroupedDataStream.agg

Aggregate this GroupedDataStream according to the defined aggregations. This is similar to Pandas df.groupby().agg(). The result's length will be however number of rows as there are unique group keys combinations.

The result is a DataStream that will return a batch when the entire aggregation is done, since it's impossible to return any aggregation results without seeing the entire dataset. As a result, you should call .compute() or .collect() on this DataStream instead of doing additional operations on it like .filter() since those won't be pipelined anyways. The only reason Quokka by default returns a DataStream instead of just returning a Polars DataFrame or a Quokka DataSet is so you can do .explain() on it.

Parameters:

Name Type Description Default
aggregations dict

similar to a dictionary argument to Pandas df.agg(). The key is the column name, where the value is a str that is "min", "max", "mean", "sum", "avg" or a list of such strings. If you desire to have the count column in your result, add a key "*" with value "count". Look at the examples.

required
Return

A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done. You should call .collect() or .compute() on it as it is impossible to pipeline past an aggregation, so might as well as materialize it right now.

Examples:

>>> lineitem = qc.read_csv("lineitem.csv")
>>> d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
>>> d = d.with_column("disc_price", lambda x:x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})

I want the sum and average of the l_quantity column and the l_extendedprice column, the sum of the disc_price column, the minimum of the l_discount column, and oh give me the total row count as well, of each unique combination of l_returnflag and l_linestatus

>>> f = d.groupby(["l_returnflag", "l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "l_discount":"min","*":"count"})
Source code in pyquokka/datastream.py
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
def agg(self, aggregations: dict):

    """
    Aggregate this GroupedDataStream according to the defined aggregations. This is similar to Pandas `df.groupby().agg()`.
    The result's length will be however number of rows as there are unique group keys combinations.

    The result is a DataStream that will return a batch when the entire aggregation is done, since it's impossible to return any aggregation
    results without seeing the entire dataset. As a result, you should call `.compute()` or `.collect()` on this DataStream instead of doing 
    additional operations on it like `.filter()` since those won't be pipelined anyways. The only reason Quokka by default returns a DataStream
    instead of just returning a Polars DataFrame or a Quokka DataSet is so you can do `.explain()` on it.

    Args:
        aggregations (dict): similar to a dictionary argument to Pandas `df.agg()`. The key is the column name, where the value
            is a str that is "min", "max", "mean", "sum", "avg" or a list of such strings. If you desire to have the count column
            in your result, add a key "*" with value "count". Look at the examples.

    Return:
        A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done. 
        You should call `.collect()` or `.compute()` on it as it is impossible to pipeline past an 
        aggregation, so might as well as materialize it right now.

    Examples:

        >>> lineitem = qc.read_csv("lineitem.csv")

        >>> d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")

        >>> d = d.with_column("disc_price", lambda x:x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})

        I want the sum and average of the l_quantity column and the l_extendedprice column, the sum of the disc_price column, the minimum of the l_discount
        column, and oh give me the total row count as well, of each unique combination of l_returnflag and l_linestatus

        >>> f = d.groupby(["l_returnflag", "l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "l_discount":"min","*":"count"})
    """

    return self.source_data_stream._grouped_aggregate(self.groupby, aggregations, self.orderby)