Aggregate this GroupedDataStream according to the defined aggregations. This is similar to Pandas df.groupby().agg()
.
The result's length will be however number of rows as there are unique group keys combinations.
The result is a DataStream that will return a batch when the entire aggregation is done, since it's impossible to return any aggregation
results without seeing the entire dataset. As a result, you should call .compute()
or .collect()
on this DataStream instead of doing
additional operations on it like .filter()
since those won't be pipelined anyways. The only reason Quokka by default returns a DataStream
instead of just returning a Polars DataFrame or a Quokka DataSet is so you can do .explain()
on it.
Parameters:
Name |
Type |
Description |
Default |
aggregations |
dict
|
similar to a dictionary argument to Pandas df.agg() . The key is the column name, where the value
is a str that is "min", "max", "mean", "sum", "avg" or a list of such strings. If you desire to have the count column
in your result, add a key "*" with value "count". Look at the examples. |
required
|
Return
A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done.
You should call .collect()
or .compute()
on it as it is impossible to pipeline past an
aggregation, so might as well as materialize it right now.
Examples:
>>> lineitem = qc.read_csv("lineitem.csv")
>>> d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
>>> d = d.with_column("disc_price", lambda x:x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})
I want the sum and average of the l_quantity column and the l_extendedprice column, the sum of the disc_price column, the minimum of the l_discount
column, and oh give me the total row count as well, of each unique combination of l_returnflag and l_linestatus
>>> f = d.groupby(["l_returnflag", "l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "l_discount":"min","*":"count"})
Source code in pyquokka/datastream.py
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161 | def agg(self, aggregations: dict):
"""
Aggregate this GroupedDataStream according to the defined aggregations. This is similar to Pandas `df.groupby().agg()`.
The result's length will be however number of rows as there are unique group keys combinations.
The result is a DataStream that will return a batch when the entire aggregation is done, since it's impossible to return any aggregation
results without seeing the entire dataset. As a result, you should call `.compute()` or `.collect()` on this DataStream instead of doing
additional operations on it like `.filter()` since those won't be pipelined anyways. The only reason Quokka by default returns a DataStream
instead of just returning a Polars DataFrame or a Quokka DataSet is so you can do `.explain()` on it.
Args:
aggregations (dict): similar to a dictionary argument to Pandas `df.agg()`. The key is the column name, where the value
is a str that is "min", "max", "mean", "sum", "avg" or a list of such strings. If you desire to have the count column
in your result, add a key "*" with value "count". Look at the examples.
Return:
A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done.
You should call `.collect()` or `.compute()` on it as it is impossible to pipeline past an
aggregation, so might as well as materialize it right now.
Examples:
>>> lineitem = qc.read_csv("lineitem.csv")
>>> d = lineitem.filter("l_shipdate <= date '1998-12-01' - interval '90' day")
>>> d = d.with_column("disc_price", lambda x:x["l_extendedprice"] * (1 - x["l_discount"]), required_columns ={"l_extendedprice", "l_discount"})
I want the sum and average of the l_quantity column and the l_extendedprice column, the sum of the disc_price column, the minimum of the l_discount
column, and oh give me the total row count as well, of each unique combination of l_returnflag and l_linestatus
>>> f = d.groupby(["l_returnflag", "l_linestatus"]).agg({"l_quantity":["sum","avg"], "l_extendedprice":["sum","avg"], "disc_price":"sum", "l_discount":"min","*":"count"})
"""
return self.source_data_stream._grouped_aggregate(self.groupby, aggregations, self.orderby)
|