Skip to content

DataStream.agg_sql

This is the SQL version of agg. It takes a SQL statement as input instead of a dictionary. The SQL statement must be a valid SQL statement. The requirements are similar to what you need for transform_sql. Please look at the examples. Exotic SQL statements may not work, such as count_distinct, percentile etc. Please limit your aggregations to mean/max/min/sum/avg/count for now.

Parameters:

Name Type Description Default
aggregations str

a valid SQL statement. The requirements are similar to what you need for transform_sql.

required
Return

A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done.

Examples:

>>> d = d.agg_sql("sum(l_extendedprice * (1 - l_discount)) as revenue")
>>> f = d.agg_sql("count(*) as count_order")
>>>  f = d.agg_sql("
>>>        sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count,
>>>        sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count
>>>    ")
Source code in pyquokka/datastream.py
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
def agg_sql(self, aggregations: str):

    """
    This is the SQL version of `agg`. It takes a SQL statement as input instead of a dictionary. The SQL statement must be a valid SQL statement.
    The requirements are similar to what you need for `transform_sql`. Please look at the examples. Exotic SQL statements may not work, such as `count_distinct`, `percentile` etc.
    Please limit your aggregations to mean/max/min/sum/avg/count for now. 

    Args:
        aggregations (str): a valid SQL statement. The requirements are similar to what you need for `transform_sql`. 

    Return:
        A DataStream object that holds the aggregation result. It will only emit one batch, which is the result when it's done.

    Examples:

        >>> d = d.agg_sql("sum(l_extendedprice * (1 - l_discount)) as revenue")

        >>> f = d.agg_sql("count(*) as count_order")

        >>>  f = d.agg_sql("
        >>>        sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count,
        >>>        sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count
        >>>    ")

    """

    return self._grouped_aggregate_sql([], aggregations, None)